From 6c98dc6ca0a22e665065cf5399c82caa6fe2d210 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Greinhofer?= Date: Sun, 11 Feb 2024 13:51:26 -0600 Subject: [PATCH] Update Brokenspoke Pipeline table MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update the table to reflect the use of the State Machine Context to extract the State Machine ID and use it as primary key. Signed-off-by: Rémy Greinhofer --- docs/database.dbml | 6 +- docs/database.sql | 28 +- docs/database.svg | 915 ++++++++++-------- entity/src/entities/brokenspoke_pipeline.rs | 7 +- entity/src/wrappers/mod.rs | 16 +- lambdas/requests.rest | 21 +- .../src/bnas-analysis/get-bnas-analysis.rs | 4 +- .../src/bnas-analysis/patch-bnas-analysis.rs | 10 +- ...02_004130_brokenspoke_analyzer_pipeline.rs | 15 +- 9 files changed, 570 insertions(+), 452 deletions(-) diff --git a/docs/database.dbml b/docs/database.dbml index b462354..35a157c 100644 --- a/docs/database.dbml +++ b/docs/database.dbml @@ -203,15 +203,15 @@ enum brokenspoke_state { } Table bna.brokenspoke_pipeline { - brokenspoke_pipeline_id int [pk, increment] + state_machine_id uuid [pk] + scheduled_trigger_id uuid [unique] state brokenspoke_state - state_machine_id uuid sqs_message json neon_branch_id varchar(50) fargate_task_id uuid s3_bucket varchar(50) indexes { - broken_spoke_pipeline_id + state_machine_id } } diff --git a/docs/database.sql b/docs/database.sql index 08f1c86..60b1643 100644 --- a/docs/database.sql +++ b/docs/database.sql @@ -1,6 +1,6 @@ -- SQL dump generated using DBML (dbml-lang.org) -- Database: PostgreSQL --- Generated at: 2024-01-08T03:21:17.522Z +-- Generated at: 2024-02-11T19:57:43.906Z CREATE SCHEMA "bna"; @@ -16,6 +16,20 @@ CREATE TYPE "approval_status" AS ENUM ( 'rejected' ); +CREATE TYPE "brokenspoke_status" AS ENUM ( + 'pending', + 'started', + 'complete' +); + +CREATE TYPE "brokenspoke_state" AS ENUM ( + 'pipeline', + 'sqs_message', + 'setup', + 'analysis', + 'export' +); + CREATE TABLE "bna"."summary" ( "bna_uuid" uuid PRIMARY KEY, "city_id" uuid, @@ -118,6 +132,16 @@ CREATE TABLE "bna"."submission" ( "status" approval_status ); +CREATE TABLE "bna"."brokenspoke_pipeline" ( + "state_machine_id" uuid PRIMARY KEY, + "scheduled_trigger_id" uuid UNIQUE, + "state" brokenspoke_state, + "sqs_message" json, + "neon_branch_id" varchar(50), + "fargate_task_id" uuid, + "s3_bucket" varchar(50) +); + CREATE INDEX ON "bna"."summary" ("bna_uuid"); CREATE INDEX ON "bna"."summary" ("city_id"); @@ -152,6 +176,8 @@ CREATE INDEX ON "bna"."speed_limit" ("speed_limit_id"); CREATE INDEX ON "bna"."submission" ("submission_id"); +CREATE INDEX ON "bna"."brokenspoke_pipeline" ("state_machine_id"); + ALTER TABLE "bna"."summary" ADD FOREIGN KEY ("city_id") REFERENCES "bna"."city" ("city_id"); ALTER TABLE "bna"."census" ADD FOREIGN KEY ("city_id") REFERENCES "bna"."city" ("city_id"); diff --git a/docs/database.svg b/docs/database.svg index e80e1cd..f8bf0d1 100644 --- a/docs/database.svg +++ b/docs/database.svg @@ -4,532 +4,617 @@ - - + + dbml cluster_analysis - -analysis + +analysis cluster_city - -city + +city size - - - -       size        - - -    small     - - -    medium     - - -    large     - + + + +       size        + + +    small     + + +    medium     + + +    large     + approval_status - - - -       approval_status        - - -    pending     - - -    approved     - - -    rejected     - + + + +       approval_status        + + +    pending     + + +    approved     + + +    rejected     + + + + +brokenspoke_status + + + +       brokenspoke_status        + + +    pending     + + +    started     + + +    complete     + + + + +brokenspoke_state + + + +       brokenspoke_state        + + +    pipeline     + + +    sqs_message     + + +    setup     + + +    analysis     + + +    export     + bna.summary - - - -       bna.summary        - - -bna_uuid -     -uuid - - -city_id     -uuid - - -created_at     -date - - -score     -float - - -version     -varchar(10) - + + + +       bna.summary        + + +bna_uuid +     +uuid + + +city_id     +uuid + + +created_at     +date + + +score     +float + + +version     +varchar(10) + bna.features - - - -       bna.features        - - -bna_uuid -     -uuid - - -people     -float - - -retail     -float - - -transit     -float - + + + +       bna.features        + + +bna_uuid +     +uuid + + +people     +float + + +retail     +float + + +transit     +float + bna.summary:e->bna.features:w - - -1 -1 + + +1 +1 bna.core_services - - - -       bna.core_services        - - -bna_uuid -     -uuid - - -dentists     -float - - -doctors     -float - - -grocery     -float - - -hospitals     -float - - -pharmacies     -float - - -score     -float - - -social_services     -float - + + + +       bna.core_services        + + +bna_uuid +     +uuid + + +dentists     +float + + +doctors     +float + + +grocery     +float + + +hospitals     +float + + +pharmacies     +float + + +score     +float + + +social_services     +float + bna.summary:e->bna.core_services:w - - -1 -1 + + +1 +1 bna.opportunity - - - -       bna.opportunity        - - -bna_uuid -     -uuid - - -employment     -float - - -higher_education     -float - - -k12_education     -float - - -score     -float - - -technical_vocational_college     -float - + + + +       bna.opportunity        + + +bna_uuid +     +uuid + + +employment     +float + + +higher_education     +float + + +k12_education     +float + + +score     +float + + +technical_vocational_college     +float + bna.summary:e->bna.opportunity:w - - -1 -1 + + +1 +1 bna.recreation - - - -       bna.recreation        - - -bna_uuid -     -uuid - - -community_centers     -float - - -parks     -float - - -recreation_trails     -float - - -score     -float - + + + +       bna.recreation        + + +bna_uuid +     +uuid + + +community_centers     +float + + +parks     +float + + +recreation_trails     +float + + +score     +float + bna.summary:e->bna.recreation:w - - -1 -1 + + +1 +1 bna.infrastructure - - - -       bna.infrastructure        - - -bna_uuid -     -uuid - - -low_stress_miles     -float - - -high_stress_miles     -float - + + + +       bna.infrastructure        + + +bna_uuid +     +uuid + + +low_stress_miles     +float + + +high_stress_miles     +float + bna.summary:e->bna.infrastructure:w - - -1 -1 + + +1 +1 bna.city - - - -       bna.city        - - -city_id -     -uuid - - -country     -varchar(50) - - -latitude     -float - - -longitude     -float - - -name     -varchar(50) - - -region     -varchar(50) - - -state     -varchar(50) - - -state_abbrev     -char(2) - + + + +       bna.city        + + +city_id +     +uuid + + +country     +varchar(50) + + +latitude     +float + + +longitude     +float + + +name     +varchar(50) + + +region     +varchar(50) + + +state     +varchar(50) + + +state_abbrev     +char(2) + bna.city:e->bna.summary:w - - -* -1 + + +* +1 bna.census - - - -       bna.census        - - -census_id -     -int - - -city_id     -uuid - - -created_at     -date - - -fips_code     -char(7) - - -pop_size     -size - - -population     -int - + + + +       bna.census        + + +census_id +     +int + + +city_id     +uuid + + +created_at     +date + + +fips_code     +char(7) + + +pop_size     +size + + +population     +int + bna.city:e->bna.census:w - - -* -1 + + +* +1 bna.ranking - - - -       bna.ranking        - - -ranking_id -     -int - - -city_id     -uuid - - -country     -int - - -country_size     -int - - -created_at     -date - - -global     -int - - -size     -int - - -state     -int - + + + +       bna.ranking        + + +ranking_id +     +int + + +city_id     +uuid + + +country     +int + + +country_size     +int + + +created_at     +date + + +global     +int + + +size     +int + + +state     +int + bna.city:e->bna.ranking:w - - -* -1 + + +* +1 bna.speed_limit - - - -       bna.speed_limit        - - -speed_limit_id -     -int - - -city_id     -uuid - - -created_at     -date - - -residential     -int - + + + +       bna.speed_limit        + + +speed_limit_id +     +int + + +city_id     +uuid + + +created_at     +date + + +residential     +int + bna.city:e->bna.speed_limit:w - - -* -1 + + +* +1 bna.census:e->size:w - + bna.submission - - - -       bna.submission        - - -submission_id -     -int - - -firstname     -varchar(50) - - -lastname     -varchar(50) - - -title     -varchar(50) - - -organization     -varchar(50) - - -email     -varchar(50) - - -country     -varchar(50) - - -city     -varchar(50) - - -region     -varchar(50) - - -fipscode     -varchar(50) - - -consent     -varchar(50) - - -status     -approval_status - + + + +       bna.submission        + + +submission_id +     +int + + +firstname     +varchar(50) + + +lastname     +varchar(50) + + +title     +varchar(50) + + +organization     +varchar(50) + + +email     +varchar(50) + + +country     +varchar(50) + + +city     +varchar(50) + + +region     +varchar(50) + + +fipscode     +varchar(50) + + +consent     +varchar(50) + + +status     +approval_status + bna.submission:e->approval_status:w - + + + + +bna.brokenspoke_pipeline + + + +       bna.brokenspoke_pipeline        + + +state_machine_id +     +uuid + + +scheduled_trigger_id     +uuid + + +state     +brokenspoke_state + + +sqs_message     +json + + +neon_branch_id     +varchar(50) + + +fargate_task_id     +uuid + + +s3_bucket     +varchar(50) + + + + +bna.brokenspoke_pipeline:e->brokenspoke_state:w + diff --git a/entity/src/entities/brokenspoke_pipeline.rs b/entity/src/entities/brokenspoke_pipeline.rs index 0a1c174..67ef045 100644 --- a/entity/src/entities/brokenspoke_pipeline.rs +++ b/entity/src/entities/brokenspoke_pipeline.rs @@ -7,10 +7,11 @@ use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)] #[sea_orm(table_name = "brokenspoke_pipeline")] pub struct Model { - #[sea_orm(primary_key)] - pub id: i32, + #[sea_orm(primary_key, auto_increment = false)] + pub state_machine_id: Uuid, + #[sea_orm(unique)] + pub scheduled_trigger_id: Option, pub state: Option, - pub state_machine_id: Option, pub sqs_message: Option, pub neon_branch_id: Option, pub fargate_task_id: Option, diff --git a/entity/src/wrappers/mod.rs b/entity/src/wrappers/mod.rs index d7f5cdf..ba16cc3 100644 --- a/entity/src/wrappers/mod.rs +++ b/entity/src/wrappers/mod.rs @@ -158,7 +158,8 @@ impl IntoActiveModel for SubmissionPatch { #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct BrokenspokePipelinePost { pub state: Option, - pub state_machine_id: Option, + pub state_machine_id: Uuid, + pub scheduled_trigger_id: Option, pub sqs_message: Option, pub neon_branch_id: Option, pub fargate_task_id: Option, @@ -168,13 +169,13 @@ pub struct BrokenspokePipelinePost { impl IntoActiveModel for BrokenspokePipelinePost { fn into_active_model(self) -> brokenspoke_pipeline::ActiveModel { brokenspoke_pipeline::ActiveModel { - id: ActiveValue::NotSet, state: ActiveValue::Set(self.state), state_machine_id: ActiveValue::Set(self.state_machine_id), sqs_message: ActiveValue::Set(self.sqs_message), neon_branch_id: ActiveValue::Set(self.neon_branch_id), fargate_task_id: ActiveValue::Set(self.fargate_task_id), s3_bucket: ActiveValue::Set(self.s3_bucket), + scheduled_trigger_id: ActiveValue::Set(self.scheduled_trigger_id), } } } @@ -182,7 +183,8 @@ impl IntoActiveModel for BrokenspokePipelineP #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct BrokenspokePipelinePatch { pub state: Option>, - pub state_machine_id: Option>, + pub state_machine_id: Uuid, + pub scheduled_trigger_id: Option>, pub sqs_message: Option>, pub neon_branch_id: Option>, pub fargate_task_id: Option>, @@ -192,11 +194,8 @@ pub struct BrokenspokePipelinePatch { impl IntoActiveModel for BrokenspokePipelinePatch { fn into_active_model(self) -> brokenspoke_pipeline::ActiveModel { brokenspoke_pipeline::ActiveModel { - id: ActiveValue::NotSet, state: self.state.map_or(ActiveValue::NotSet, ActiveValue::Set), - state_machine_id: self - .state_machine_id - .map_or(ActiveValue::NotSet, ActiveValue::Set), + state_machine_id: ActiveValue::Unchanged(self.state_machine_id), sqs_message: self .sqs_message .map_or(ActiveValue::NotSet, ActiveValue::Set), @@ -207,6 +206,9 @@ impl IntoActiveModel for BrokenspokePipelineP .fargate_task_id .map_or(ActiveValue::NotSet, ActiveValue::Set), s3_bucket: self.s3_bucket.map_or(ActiveValue::NotSet, ActiveValue::Set), + scheduled_trigger_id: self + .scheduled_trigger_id + .map_or(ActiveValue::NotSet, ActiveValue::Set), } } } diff --git a/lambdas/requests.rest b/lambdas/requests.rest index 4552154..cc68c2a 100644 --- a/lambdas/requests.rest +++ b/lambdas/requests.rest @@ -136,14 +136,6 @@ Authorization: Bearer {{cognito_access}} "status": "Accepted" } -### Query the BNA analysis performed by the Brokenspoke-analyzer pipeline. -GET {{host}}/bnas/analysis -Authorization: Bearer {{cognito_access}} - -### Query a specific BNA analysis performed by the Brokenspoke-analyzer pipeline. -GET {{host}}/bnas/analysis/1 -Authorization: Bearer {{cognito_access}} - ### Create a new BNA analysis performed by the Brokenspoke-analyzer pipeline. POST {{host}}/bnas/analysis content-type: application/json @@ -151,15 +143,24 @@ Authorization: Bearer {{cognito_access}} { "state": "Analysis", - "state_machine_id": "166680c0-4e74-409b-b095-ae45ae458f0a", + "state_machine_id": "166680c0-4e74-409b-b095-ae45ae458f0", + "scheduled_trigger_id": "04ca18b9-6e0c-1aa5-2c3f-d4b445f840bc", "sqs_message": "{\"city\": \"Valetta\", \"country\": \"Malta\"}", "neon_branch_id": null, "fargate_task_id": null, "s3_bucket": null } +### Query the BNA analysis performed by the Brokenspoke-analyzer pipeline. +GET {{host}}/bnas/analysis +Authorization: Bearer {{cognito_access}} + +### Query a specific BNA analysis performed by the Brokenspoke-analyzer pipeline. +GET {{host}}/bnas/analysis/166680c0-4e74-409b-b095-ae45ae458f0 +Authorization: Bearer {{cognito_access}} + ### Update an existing BNA analysis performed by the Brokenspoke-analyzer pipeline. -PATCH {{host}}/bnas/analysis/1 +PATCH {{host}}/bnas/analysis/166680c0-4e74-409b-b095-ae45ae458f0 content-type: application/json Authorization: Bearer {{cognito_access}} diff --git a/lambdas/src/bnas-analysis/get-bnas-analysis.rs b/lambdas/src/bnas-analysis/get-bnas-analysis.rs index 66cdb44..ef42ccb 100644 --- a/lambdas/src/bnas-analysis/get-bnas-analysis.rs +++ b/lambdas/src/bnas-analysis/get-bnas-analysis.rs @@ -7,7 +7,7 @@ use effortless::{ use entity::prelude::*; use lambda_http::{run, service_fn, Body, Error, IntoResponse, Request, Response}; use lambdas::{api_database_connect, build_paginated_response, pagination_parameters}; -use sea_orm::{EntityTrait, PaginatorTrait}; +use sea_orm::{prelude::Uuid, EntityTrait, PaginatorTrait}; use serde_json::json; use tracing::debug; @@ -22,7 +22,7 @@ async fn function_handler(event: Request) -> Result, Error> { // Retrieve the ID of the entry to update. let parameter = "id"; - let id = event.path_parameter::(parameter); + let id = event.path_parameter::(parameter); // Set the database connection. let db = match api_database_connect(&event).await { diff --git a/lambdas/src/bnas-analysis/patch-bnas-analysis.rs b/lambdas/src/bnas-analysis/patch-bnas-analysis.rs index 1fa81cf..670b3ec 100644 --- a/lambdas/src/bnas-analysis/patch-bnas-analysis.rs +++ b/lambdas/src/bnas-analysis/patch-bnas-analysis.rs @@ -7,7 +7,7 @@ use effortless::{ use entity::{brokenspoke_pipeline::ActiveModel, prelude::*, wrappers}; use lambda_http::{run, service_fn, Body, Error, IntoResponse, Request, Response}; use lambdas::database_connect; -use sea_orm::{ActiveValue, EntityTrait, IntoActiveModel}; +use sea_orm::{prelude::Uuid, ActiveValue, EntityTrait, IntoActiveModel}; use serde_json::json; use tracing::info; @@ -22,7 +22,7 @@ async fn function_handler(event: Request) -> Result, Error> { info!( "updating Submission {:?} into database: {:?}", - active_model.id, active_model + active_model.state_machine_id, active_model ); // Get the database connection. @@ -30,14 +30,14 @@ async fn function_handler(event: Request) -> Result, Error> { // Update the entry. let res = BrokenspokePipeline::update(active_model).exec(&db).await?; - Ok(json!(res.id).into_response().await) + Ok(json!(res.state_machine_id).into_response().await) } pub fn prepare_active_model(event: &Request) -> Result { // Retrieve the ID of the entry to update. let parameter = "id"; let id = event - .path_parameter::(parameter) + .path_parameter::(parameter) .ok_or(missing_parameter(event, parameter))? .map_err(|e| invalid_path_parameter(event, parameter, e.to_string().as_str()))?; @@ -46,7 +46,7 @@ pub fn prepare_active_model(event: &Request) -> Result { // Turn the wrapper into an active model. let mut active_model = wrapper.into_active_model(); - active_model.id = ActiveValue::Unchanged(id); + active_model.state_machine_id = ActiveValue::Unchanged(id); Ok(active_model) } diff --git a/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs b/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs index cb681cc..2ed5679 100644 --- a/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs +++ b/migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs @@ -33,17 +33,20 @@ impl MigrationTrait for Migration { .table(BrokenspokePipeline::Table) .if_not_exists() .col( - ColumnDef::new(BrokenspokePipeline::Id) - .integer() + ColumnDef::new(BrokenspokePipeline::StateMachineId) + .uuid() .not_null() - .auto_increment() .primary_key(), ) + .col( + ColumnDef::new(BrokenspokePipeline::ScheduledTriggerId) + .uuid() + .unique_key(), + ) .col( ColumnDef::new(BrokenspokePipeline::State) .enumeration(BrokenspokeState::Table, BrokenspokeState::iter().skip(1)), ) - .col(ColumnDef::new(BrokenspokePipeline::StateMachineId).string()) .col(ColumnDef::new(BrokenspokePipeline::SqsMessage).json()) .col(ColumnDef::new(BrokenspokePipeline::NeonBranchId).string()) .col(ColumnDef::new(BrokenspokePipeline::FargateTaskId).uuid()) @@ -63,9 +66,9 @@ impl MigrationTrait for Migration { #[derive(DeriveIden)] enum BrokenspokePipeline { Table, - Id, - State, StateMachineId, + ScheduledTriggerId, + State, SqsMessage, NeonBranchId, FargateTaskId,