diff --git a/.github/workflows/deployment-staging.yml b/.github/workflows/deployment-staging.yml index a3b7fdd..c6d5732 100644 --- a/.github/workflows/deployment-staging.yml +++ b/.github/workflows/deployment-staging.yml @@ -36,12 +36,15 @@ jobs: - name: Deploy lambdas run: | LAMBDAS="get-bnas + get-bnas-analysis get-bnas-cities get-bnas-results get-cities get-cities-bnas get-cities-submissions + patch-bnas-analysis patch-cities-submissions + post-bnas-analysis post-bnas-enqueue post-cities-submissions" echo $LAMBDAS \ diff --git a/docs/database.dbml b/docs/database.dbml index ccf8f3c..b462354 100644 --- a/docs/database.dbml +++ b/docs/database.dbml @@ -187,3 +187,31 @@ Ref: bna.summary.bna_uuid - bna.opportunity.bna_uuid Ref: bna.summary.bna_uuid - bna.recreation.bna_uuid Ref: bna.summary.bna_uuid - bna.infrastructure.bna_uuid Ref: bna.summary.bna_uuid - bna.features.bna_uuid + +enum brokenspoke_status { + pending + started + complete +} + +enum brokenspoke_state { + pipeline + sqs_message + setup + analysis + export +} + +Table bna.brokenspoke_pipeline { + brokenspoke_pipeline_id int [pk, increment] + state brokenspoke_state + state_machine_id uuid + sqs_message json + neon_branch_id varchar(50) + fargate_task_id uuid + s3_bucket varchar(50) + + indexes { + broken_spoke_pipeline_id + } +} diff --git a/effortless/src/api.rs b/effortless/src/api.rs index 63552e8..3e1d9fc 100644 --- a/effortless/src/api.rs +++ b/effortless/src/api.rs @@ -2,8 +2,8 @@ use crate::{ error::{APIError, APIErrorSource, APIErrors}, fragment::{self, get_apigw_request_id}, }; -use lambda_http::{http::StatusCode, Request}; -use serde::Deserialize; +use lambda_http::{http::StatusCode, Request, RequestPayloadExt}; +use serde::de::DeserializeOwned; use std::{fmt::Display, str::FromStr}; /// Parse the first path parameter found in the API Gateway request, into the provided type. @@ -87,7 +87,7 @@ where /// /// ```rust /// use effortless::api::parse_request_body; -/// use lambda_http::{Request, RequestExt}; +/// use lambda_http::{http::{self, StatusCode}, Body, Request, RequestExt}; /// /// use serde::Deserialize; /// @@ -97,31 +97,22 @@ where /// pub last_name: String /// } /// -/// let event = Request::new("{\n \"first_name\": \"Rosa\",\n \"last_name\": \"Maria\"\n}".into()) -/// .with_request_context(lambda_http::request::RequestContext::ApiGatewayV2( -/// lambda_http::aws_lambda_events::apigw::ApiGatewayV2httpRequestContext::default(), -/// )); +/// let event = http::Request::builder() +/// .header(http::header::CONTENT_TYPE, "application/json") +/// .body(Body::from(r#"{"first_name": "Rosa","last_name": "Maria"}"#)) +/// .expect("failed to build request"); /// let person = parse_request_body::(&event).unwrap(); /// assert_eq!(person.first_name, "Rosa"); /// assert_eq!(person.last_name, "Maria"); /// ``` pub fn parse_request_body(event: &Request) -> Result where - T: for<'a> Deserialize<'a>, + T: DeserializeOwned, { - match fragment::parse_request_body::(event) { - Ok(o) => Ok(o), - Err(e) => { - let api_error = APIError::new( - get_apigw_request_id(event), - StatusCode::BAD_REQUEST, - "Invalid Body", - e.to_string().as_str(), - None, - ); - Err(APIErrors::new(&[api_error])) - } - } + let payload = event + .payload::() + .map_err(|e| invalid_body(event, e.to_string().as_str()))?; + Ok(payload.ok_or_else(|| invalid_body(event, "No request body was provided."))?) } /// Create an APIError representing an item not found error. @@ -176,3 +167,12 @@ pub fn invalid_body(event: &Request, details: &str) -> APIError { None, ) } + +/// Create and APIError from and API Gateway event, representing a parameter issue. +pub fn invalid_path_parameter(event: &Request, parameter: &str, details: &str) -> APIError { + APIError::with_pointer( + get_apigw_request_id(event), + parameter, + format!("invalid path parameter `{parameter}`: {details}").as_str(), + ) +} diff --git a/effortless/src/fragment.rs b/effortless/src/fragment.rs index 7582f16..5270775 100644 --- a/effortless/src/fragment.rs +++ b/effortless/src/fragment.rs @@ -1,4 +1,4 @@ -use lambda_http::{aws_lambda_events::query_map::QueryMap, Request, RequestExt}; +use lambda_http::{aws_lambda_events::query_map::QueryMap, http, Request, RequestExt}; use serde::Deserialize; use std::str::FromStr; @@ -6,16 +6,11 @@ fn parse_parameter(qm: &QueryMap, parameter: &str) -> Option { - let parsed_parameter = parameter_str.parse::(); - match parsed_parameter { - Ok(param) => Some(Ok(param)), - Err(e) => Some(Err(e)), - } - } - None => None, - } + qm.first(parameter) + .map(|parameter_str| match parameter_str.parse::() { + Ok(param) => Some(Ok(param)), + Err(e) => Some(Err(e)), + })? } /// Parse the first matching path parameter into the provided type. @@ -104,3 +99,73 @@ pub fn get_apigw_request_id(event: &Request) -> Option { _ => None, } } + +/// Attempt to create an extension trait for [`lambda_http::Request`]. +pub trait BnaRequestExt { + /// Return the first matching parameter from the QueryMap, deserialized into its type T, if it exists. + fn parse_parameter(qm: &QueryMap, parameter: &str) -> Option::Err>> + where + T: FromStr, + { + qm.first(parameter) + .map(|parameter_str| match parameter_str.parse::() { + Ok(param) => Some(Ok(param)), + Err(e) => Some(Err(e)), + })? + } + + /// Return the first matching path parameter if it exists. + fn first_path_parameter(&self, parameter: &str) -> Option; + + /// Return the first matching query string parameter if it exists. + fn first_query_string_parameter(&self, parameter: &str) -> Option; + + /// Return the first matching path parameter deserialized into its type T, if it exists. + fn path_parameter(&self, parameter: &str) -> Option> + where + T: FromStr; + + /// Return the first matching path parameter deserialized into its type T, if it exists. + fn query_string_parameter(&self, parameter: &str) -> Option> + where + T: FromStr; + + /// Returns the Api Gateway Request ID from an ApiGatewayV2 event. + /// + /// If there is no request ID or the event is not coming from an ApiGatewayV2, the + /// function returns None. + fn apigw_request_id(&self) -> Option; +} + +impl BnaRequestExt for http::Request { + fn first_path_parameter(&self, parameter: &str) -> Option { + self.path_parameters().first(parameter).map(String::from) + } + + fn first_query_string_parameter(&self, parameter: &str) -> Option { + self.query_string_parameters() + .first(parameter) + .map(String::from) + } + + fn path_parameter(&self, parameter: &str) -> Option::Err>> + where + T: FromStr, + { + Self::parse_parameter::(&self.path_parameters(), parameter) + } + + fn query_string_parameter(&self, parameter: &str) -> Option::Err>> + where + T: FromStr, + { + Self::parse_parameter::(&self.query_string_parameters(), parameter) + } + + fn apigw_request_id(&self) -> Option { + match self.request_context() { + lambda_http::request::RequestContext::ApiGatewayV2(payload) => payload.request_id, + _ => None, + } + } +} diff --git a/entity/src/entities/brokenspoke_pipeline.rs b/entity/src/entities/brokenspoke_pipeline.rs new file mode 100644 index 0000000..0a1c174 --- /dev/null +++ b/entity/src/entities/brokenspoke_pipeline.rs @@ -0,0 +1,23 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.3 + +use super::sea_orm_active_enums::BrokenspokeState; +use sea_orm::entity::prelude::*; +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] +#[sea_orm(table_name = "brokenspoke_pipeline")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: i32, + pub state: Option, + pub state_machine_id: Option, + pub sqs_message: Option, + pub neon_branch_id: Option, + pub fargate_task_id: Option, + pub s3_bucket: Option, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/entity/src/entities/mod.rs b/entity/src/entities/mod.rs index e7febd5..3ccd2a6 100644 --- a/entity/src/entities/mod.rs +++ b/entity/src/entities/mod.rs @@ -2,6 +2,7 @@ pub mod prelude; +pub mod brokenspoke_pipeline; pub mod census; pub mod city; pub mod core_services; diff --git a/entity/src/entities/prelude.rs b/entity/src/entities/prelude.rs index 5d05955..354995c 100644 --- a/entity/src/entities/prelude.rs +++ b/entity/src/entities/prelude.rs @@ -1,5 +1,6 @@ //! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.3 +pub use super::brokenspoke_pipeline::Entity as BrokenspokePipeline; pub use super::census::Entity as Census; pub use super::city::Entity as City; pub use super::core_services::Entity as CoreServices; diff --git a/entity/src/entities/sea_orm_active_enums.rs b/entity/src/entities/sea_orm_active_enums.rs index bb3f055..139d40d 100644 --- a/entity/src/entities/sea_orm_active_enums.rs +++ b/entity/src/entities/sea_orm_active_enums.rs @@ -13,3 +13,17 @@ pub enum ApprovalStatus { #[sea_orm(string_value = "Rejected")] Rejected, } +#[derive(Debug, Clone, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)] +#[sea_orm(rs_type = "String", db_type = "Enum", enum_name = "brokenspoke_state")] +pub enum BrokenspokeState { + #[sea_orm(string_value = "analysis")] + Analysis, + #[sea_orm(string_value = "export")] + Export, + #[sea_orm(string_value = "pipeline")] + Pipeline, + #[sea_orm(string_value = "setup")] + Setup, + #[sea_orm(string_value = "sqs_message")] + SqsMessage, +} diff --git a/entity/src/wrappers/mod.rs b/entity/src/wrappers/mod.rs index 606edc4..d7f5cdf 100644 --- a/entity/src/wrappers/mod.rs +++ b/entity/src/wrappers/mod.rs @@ -1,8 +1,10 @@ use std::str::FromStr; -use crate::entities::sea_orm_active_enums; -use crate::entities::submission; -use sea_orm::{ActiveValue, IntoActiveModel}; +use crate::entities::{brokenspoke_pipeline, sea_orm_active_enums, submission}; +use sea_orm::{ + prelude::{Json, Uuid}, + ActiveValue, IntoActiveModel, +}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] @@ -39,9 +41,50 @@ impl FromStr for ApprovalStatus { serde_plain::from_str::(s) } } + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub enum BrokenspokeState { + Analysis, + Export, + Pipeline, + Setup, + SqsMessage, +} + +impl From for BrokenspokeState { + fn from(value: sea_orm_active_enums::BrokenspokeState) -> Self { + match value { + sea_orm_active_enums::BrokenspokeState::Analysis => Self::Analysis, + sea_orm_active_enums::BrokenspokeState::Export => Self::Export, + sea_orm_active_enums::BrokenspokeState::Pipeline => Self::Pipeline, + sea_orm_active_enums::BrokenspokeState::Setup => Self::Setup, + sea_orm_active_enums::BrokenspokeState::SqsMessage => Self::SqsMessage, + } + } +} + +impl From for sea_orm_active_enums::BrokenspokeState { + fn from(val: BrokenspokeState) -> Self { + match val { + BrokenspokeState::Analysis => sea_orm_active_enums::BrokenspokeState::Analysis, + BrokenspokeState::Export => sea_orm_active_enums::BrokenspokeState::Export, + BrokenspokeState::Pipeline => sea_orm_active_enums::BrokenspokeState::Pipeline, + BrokenspokeState::Setup => sea_orm_active_enums::BrokenspokeState::Setup, + BrokenspokeState::SqsMessage => sea_orm_active_enums::BrokenspokeState::SqsMessage, + } + } +} + +impl FromStr for BrokenspokeState { + type Err = serde_plain::Error; + + fn from_str(s: &str) -> Result { + serde_plain::from_str::(s) + } +} + #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct SubmissionPost { - pub id: Option, pub first_name: String, pub last_name: String, pub title: Option, @@ -58,7 +101,7 @@ pub struct SubmissionPost { impl IntoActiveModel for SubmissionPost { fn into_active_model(self) -> submission::ActiveModel { submission::ActiveModel { - id: self.id.map_or(ActiveValue::NotSet, ActiveValue::Set), + id: ActiveValue::NotSet, first_name: ActiveValue::Set(self.first_name), last_name: ActiveValue::Set(self.last_name), title: ActiveValue::Set(self.title), @@ -76,7 +119,6 @@ impl IntoActiveModel for SubmissionPost { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct SubmissionPatch { - pub id: Option, pub first_name: Option, pub last_name: Option, pub title: Option>, @@ -93,7 +135,7 @@ pub struct SubmissionPatch { impl IntoActiveModel for SubmissionPatch { fn into_active_model(self) -> submission::ActiveModel { submission::ActiveModel { - id: self.id.map_or(ActiveValue::NotSet, ActiveValue::Unchanged), + id: ActiveValue::NotSet, first_name: self .first_name .map_or(ActiveValue::NotSet, ActiveValue::Set), @@ -113,6 +155,62 @@ impl IntoActiveModel for SubmissionPatch { } } +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct BrokenspokePipelinePost { + pub state: Option, + pub state_machine_id: Option, + pub sqs_message: Option, + pub neon_branch_id: Option, + pub fargate_task_id: Option, + pub s3_bucket: Option, +} + +impl IntoActiveModel for BrokenspokePipelinePost { + fn into_active_model(self) -> brokenspoke_pipeline::ActiveModel { + brokenspoke_pipeline::ActiveModel { + id: ActiveValue::NotSet, + state: ActiveValue::Set(self.state), + state_machine_id: ActiveValue::Set(self.state_machine_id), + sqs_message: ActiveValue::Set(self.sqs_message), + neon_branch_id: ActiveValue::Set(self.neon_branch_id), + fargate_task_id: ActiveValue::Set(self.fargate_task_id), + s3_bucket: ActiveValue::Set(self.s3_bucket), + } + } +} + +#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct BrokenspokePipelinePatch { + pub state: Option>, + pub state_machine_id: Option>, + pub sqs_message: Option>, + pub neon_branch_id: Option>, + pub fargate_task_id: Option>, + pub s3_bucket: Option>, +} + +impl IntoActiveModel for BrokenspokePipelinePatch { + fn into_active_model(self) -> brokenspoke_pipeline::ActiveModel { + brokenspoke_pipeline::ActiveModel { + id: ActiveValue::NotSet, + state: self.state.map_or(ActiveValue::NotSet, ActiveValue::Set), + state_machine_id: self + .state_machine_id + .map_or(ActiveValue::NotSet, ActiveValue::Set), + sqs_message: self + .sqs_message + .map_or(ActiveValue::NotSet, ActiveValue::Set), + neon_branch_id: self + .neon_branch_id + .map_or(ActiveValue::NotSet, ActiveValue::Set), + fargate_task_id: self + .fargate_task_id + .map_or(ActiveValue::NotSet, ActiveValue::Set), + s3_bucket: self.s3_bucket.map_or(ActiveValue::NotSet, ActiveValue::Set), + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -131,7 +229,6 @@ mod tests { let consent = true; let status = None; let wrapper = SubmissionPost { - id: None, first_name: first_name.clone(), last_name: last_name.clone(), title: title.clone(), @@ -176,7 +273,6 @@ mod tests { let consent = true; let status = Some(sea_orm_active_enums::ApprovalStatus::Approved); let wrapper = SubmissionPost { - id: None, first_name: first_name.clone(), last_name: last_name.clone(), title: title.clone(), @@ -212,7 +308,6 @@ mod tests { let id = 42; let first_name = "John".to_string(); let wrapper = SubmissionPatch { - id: Some(id), first_name: Some(first_name.clone()), last_name: None, title: None, diff --git a/lambdas/Cargo.toml b/lambdas/Cargo.toml index 2dbb277..5531d13 100644 --- a/lambdas/Cargo.toml +++ b/lambdas/Cargo.toml @@ -38,6 +38,10 @@ url = "2.3.1" name = "get-bnas" path = "src/bnas/get-bnas.rs" +[[bin]] +name = "get-bnas-analysis" +path = "src/bnas-analysis/get-bnas-analysis.rs" + [[bin]] name = "get-bnas-cities" path = "src/bnas/get-bnas-cities.rs" @@ -54,6 +58,10 @@ path = "src/cities/get-cities-bnas.rs" name = "get-cities-submissions" path = "src/cities-submissions/get-cities-submissions.rs" +[[bin]] +name = "patch-bnas-analysis" +path = "src/bnas-analysis/patch-bnas-analysis.rs" + [[bin]] name = "patch-cities-submissions" path = "src/cities-submissions/patch-cities-submissions.rs" @@ -62,6 +70,10 @@ path = "src/cities-submissions/patch-cities-submissions.rs" name = "post-cities-submissions" path = "src/cities-submissions/post-cities-submissions.rs" +[[bin]] +name = "post-bnas-analysis" +path = "src/bnas-analysis/post-bnas-analysis.rs" + [[bin]] name = "post-bnas-enqueue" path = "src/bnas-enqueue/post-bnas-enqueue.rs" diff --git a/lambdas/src/bnas-analysis/get-bnas-analysis.rs b/lambdas/src/bnas-analysis/get-bnas-analysis.rs new file mode 100644 index 0000000..66cdb44 --- /dev/null +++ b/lambdas/src/bnas-analysis/get-bnas-analysis.rs @@ -0,0 +1,86 @@ +use dotenv::dotenv; +use effortless::{ + api::{entry_not_found, invalid_path_parameter}, + error::APIError, + fragment::{get_apigw_request_id, BnaRequestExt}, +}; +use entity::prelude::*; +use lambda_http::{run, service_fn, Body, Error, IntoResponse, Request, Response}; +use lambdas::{api_database_connect, build_paginated_response, pagination_parameters}; +use sea_orm::{EntityTrait, PaginatorTrait}; +use serde_json::json; +use tracing::debug; + +async fn function_handler(event: Request) -> Result, Error> { + dotenv().ok(); + + // Retrieve pagination parameters if any. + let (page_size, page) = match pagination_parameters(&event) { + Ok((page_size, page)) => (page_size, page), + Err(e) => return Ok(e), + }; + + // Retrieve the ID of the entry to update. + let parameter = "id"; + let id = event.path_parameter::(parameter); + + // Set the database connection. + let db = match api_database_connect(&event).await { + Ok(db) => db, + Err(e) => return Ok(e), + }; + + // Retrieve all entries or a specific one. + debug!("Processing the requests..."); + let res = match id { + Some(id) => match id { + Ok(id) => { + let model = BrokenspokePipeline::find_by_id(id).one(&db).await?; + let res: Response = match model { + Some(model) => json!(model).into_response().await, + None => entry_not_found(&event).into(), + }; + res + } + Err(e) => { + return Ok( + invalid_path_parameter(&event, parameter, e.to_string().as_str()).into(), + ); + } + }, + None => { + let query = BrokenspokePipeline::find() + .paginate(&db, page_size) + .fetch_page(page - 1) + .await; + let res: Response = match query { + Ok(models) => { + let total_items = BrokenspokePipeline::find().count(&db).await?; + build_paginated_response(json!(models), total_items, page, page_size, &event)? + } + Err(e) => APIError::with_pointer( + get_apigw_request_id(&event), + event.uri().path().to_string().as_str(), + e.to_string().as_str(), + ) + .into(), + }; + res + } + }; + + Ok(res) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} diff --git a/lambdas/src/bnas-analysis/patch-bnas-analysis.rs b/lambdas/src/bnas-analysis/patch-bnas-analysis.rs new file mode 100644 index 0000000..1fa81cf --- /dev/null +++ b/lambdas/src/bnas-analysis/patch-bnas-analysis.rs @@ -0,0 +1,64 @@ +use dotenv::dotenv; +use effortless::{ + api::{invalid_path_parameter, missing_parameter, parse_request_body}, + error::APIErrors, + fragment::BnaRequestExt, +}; +use entity::{brokenspoke_pipeline::ActiveModel, prelude::*, wrappers}; +use lambda_http::{run, service_fn, Body, Error, IntoResponse, Request, Response}; +use lambdas::database_connect; +use sea_orm::{ActiveValue, EntityTrait, IntoActiveModel}; +use serde_json::json; +use tracing::info; + +async fn function_handler(event: Request) -> Result, Error> { + dotenv().ok(); + + // Prepare the model to update. + let active_model = match prepare_active_model(&event) { + Ok(model) => model, + Err(e) => return Ok(e.into()), + }; + + info!( + "updating Submission {:?} into database: {:?}", + active_model.id, active_model + ); + + // Get the database connection. + let db = database_connect(Some("DATABASE_URL_SECRET_ID")).await?; + + // Update the entry. + let res = BrokenspokePipeline::update(active_model).exec(&db).await?; + Ok(json!(res.id).into_response().await) +} + +pub fn prepare_active_model(event: &Request) -> Result { + // Retrieve the ID of the entry to update. + let parameter = "id"; + let id = event + .path_parameter::(parameter) + .ok_or(missing_parameter(event, parameter))? + .map_err(|e| invalid_path_parameter(event, parameter, e.to_string().as_str()))?; + + // Extract and deserialize the data. + let wrapper = parse_request_body::(event)?; + + // Turn the wrapper into an active model. + let mut active_model = wrapper.into_active_model(); + active_model.id = ActiveValue::Unchanged(id); + Ok(active_model) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} diff --git a/lambdas/src/bnas-analysis/post-bnas-analysis.rs b/lambdas/src/bnas-analysis/post-bnas-analysis.rs new file mode 100644 index 0000000..4836a24 --- /dev/null +++ b/lambdas/src/bnas-analysis/post-bnas-analysis.rs @@ -0,0 +1,45 @@ +use dotenv::dotenv; +use effortless::api::parse_request_body; +use entity::{prelude::*, wrappers}; +use lambda_http::{run, service_fn, Body, Error, IntoResponse, Request, Response}; +use lambdas::database_connect; +use sea_orm::{EntityTrait, IntoActiveModel}; +use serde_json::json; +use tracing::info; + +async fn function_handler(event: Request) -> Result, Error> { + dotenv().ok(); + + // Extract and serialize the data. + let wrapper = match parse_request_body::(&event) { + Ok(value) => value, + Err(e) => return Ok(e.into()), + }; + + // Turn the model wrapper into an active model. + let active_model = wrapper.into_active_model(); + + // Get the database connection. + let db = database_connect(Some("DATABASE_URL_SECRET_ID")).await?; + + // And insert a new entry. + info!( + "inserting Brokenspoke pipeline into database: {:?}", + active_model + ); + let res = BrokenspokePipeline::insert(active_model).exec(&db).await?; + Ok(json!(res.last_insert_id).into_response().await) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} diff --git a/lambdas/src/cities-submissions/patch-cities-submissions.rs b/lambdas/src/cities-submissions/patch-cities-submissions.rs index db9357c..fde51e6 100644 --- a/lambdas/src/cities-submissions/patch-cities-submissions.rs +++ b/lambdas/src/cities-submissions/patch-cities-submissions.rs @@ -14,21 +14,21 @@ async fn function_handler(event: Request) -> Result, Error> { dotenv().ok(); // Prepare the model to update. - let active_submission = match prepare_active_model(&event) { - Ok(submission) => submission, + let active_model = match prepare_active_model(&event) { + Ok(model) => model, Err(e) => return Ok(e.into()), }; info!( "updating Submission {:?} into database: {:?}", - active_submission.id, active_submission + active_model.id, active_model ); // Get the database connection. let db = database_connect(Some("DATABASE_URL_SECRET_ID")).await?; - // Update the Submission entry. - let res = Submission::update(active_submission).exec(&db).await?; + // Update the entry. + let res = Submission::update(active_model).exec(&db).await?; Ok(json!(res.id).into_response().await) } @@ -63,15 +63,18 @@ pub fn prepare_active_model(event: &Request) -> Result { #[cfg(test)] mod tests { use super::*; + use aws_lambda_events::http; use lambda_http::RequestExt; use std::collections::HashMap; #[test] fn test_prepare_active_model() { - let event = Request::new("{\"city\":\"santa rosa\",\"consent\":true,\"country\":\"usa\",\"email\":\"jane.dpe@orgllc.com\",\"fips_code\":\"3570670\",\"first_name\":\"Jane\",\"id\":1,\"last_name\":\"Doe\",\"organization\":\"Organization LLC\",\"region\":\"new mexico\",\"status\":\"Approved\",\"title\":\"CTO\"}" - .into()).with_path_parameters(HashMap::from([("id".to_string(), "1".to_string())])).with_request_context(lambda_http::request::RequestContext::ApiGatewayV2( - lambda_http::aws_lambda_events::apigw::ApiGatewayV2httpRequestContext::default(), - )); + let event = http::Request::builder() + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(r#"{"city": "santa rosa","country": "usa","email": "jane.dpe@orgllc.com","fips_code": "3570670","first_name": "Jane","last_name": "Doe","organization": "Organization LLC","region": "new mexico","title": "CTO","consent": true}"#)) + .expect("failed to build request") + .with_path_parameters(HashMap::from([("id".to_string(), "1".to_string())])) + .with_request_context(lambda_http::request::RequestContext::ApiGatewayV2(lambda_http::aws_lambda_events::apigw::ApiGatewayV2httpRequestContext::default())); let active_submission = prepare_active_model(&event).unwrap(); assert_eq!( active_submission.country, diff --git a/lambdas/src/lib.rs b/lambdas/src/lib.rs index 55e1b07..d36ae02 100644 --- a/lambdas/src/lib.rs +++ b/lambdas/src/lib.rs @@ -294,6 +294,7 @@ pub async fn api_database_connect(event: &Request) -> APIResult(&event).unwrap(); assert_eq!(submission.country, "usa") } #[test] fn test_apigw_parse_request_body() { - let event = Request::new("{\n \"city\": \"santa rosa\",\n \"country\": \"usa\",\n \"email\": \"jane.dpe@orgllc.com\",\n \"fips_code\": \"3570670\",\n \"first_name\": \"Jane\",\n \"last_name\": \"Doe\",\n \"organization\": \"Organization LLC\",\n \"region\": \"new mexico\",\n \"title\": \"CTO\",\n \"consent\": true\n}".into()).with_request_context(lambda_http::request::RequestContext::ApiGatewayV2( - lambda_http::aws_lambda_events::apigw::ApiGatewayV2httpRequestContext::default(), - )); + let event = http::Request::builder() + .header(http::header::CONTENT_TYPE, "application/json") + .body(Body::from(r#"{"city": "santa rosa","country": "usa","email": "jane.dpe@orgllc.com","fips_code": "3570670","first_name": "Jane","last_name": "Doe","organization": "Organization LLC","region": "new mexico","title": "CTO","consent": true}"#)) + .expect("failed to build request"); let submission = parse_request_body::(&event).unwrap(); assert_eq!(submission.country, "usa") } diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 3631e11..846b7eb 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -2,6 +2,7 @@ pub use sea_orm_migration::prelude::*; mod m20220101_000001_create_table; mod m20231010_232527_city_submission; +mod m20240202_004130_brokenspoke_analyzer_pipeline; pub struct Migrator; @@ -11,6 +12,7 @@ impl MigratorTrait for Migrator { vec![ Box::new(m20220101_000001_create_table::Migration), Box::new(m20231010_232527_city_submission::Migration), + Box::new(m20240202_004130_brokenspoke_analyzer_pipeline::Migration), ] } } diff --git a/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs b/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs new file mode 100644 index 0000000..cb681cc --- /dev/null +++ b/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs @@ -0,0 +1,91 @@ +use sea_orm::{EnumIter, Iterable}; +use sea_orm_migration::{prelude::*, sea_query::extension::postgres::Type}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + // Create the approval status type. + manager + .create_type( + Type::create() + .as_enum(BrokenspokeStatus::Table) + .values(BrokenspokeStatus::iter().skip(1)) + .to_owned(), + ) + .await?; + + // Create the approval status type. + manager + .create_type( + Type::create() + .as_enum(BrokenspokeState::Table) + .values(BrokenspokeState::iter().skip(1)) + .to_owned(), + ) + .await?; + + manager + .create_table( + Table::create() + .table(BrokenspokePipeline::Table) + .if_not_exists() + .col( + ColumnDef::new(BrokenspokePipeline::Id) + .integer() + .not_null() + .auto_increment() + .primary_key(), + ) + .col( + ColumnDef::new(BrokenspokePipeline::State) + .enumeration(BrokenspokeState::Table, BrokenspokeState::iter().skip(1)), + ) + .col(ColumnDef::new(BrokenspokePipeline::StateMachineId).string()) + .col(ColumnDef::new(BrokenspokePipeline::SqsMessage).json()) + .col(ColumnDef::new(BrokenspokePipeline::NeonBranchId).string()) + .col(ColumnDef::new(BrokenspokePipeline::FargateTaskId).uuid()) + .col(ColumnDef::new(BrokenspokePipeline::S3Bucket).string()) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(BrokenspokePipeline::Table).to_owned()) + .await + } +} + +#[derive(DeriveIden)] +enum BrokenspokePipeline { + Table, + Id, + State, + StateMachineId, + SqsMessage, + NeonBranchId, + FargateTaskId, + S3Bucket, +} + +#[derive(Iden, EnumIter)] +pub enum BrokenspokeStatus { + Table, + Pending, + Started, + Complete, +} + +#[derive(Iden, EnumIter)] +pub enum BrokenspokeState { + Table, + Pipeline, + SqsMessage, + Setup, + Analysis, + Export, +}