Skip to content

Commit

Permalink
Update Brokenspoke Pipeline table
Browse files Browse the repository at this point in the history
Update the table to reflect the use of the State Machine Context to
extract the State Machine ID and use it as primary key.

Signed-off-by: Rémy Greinhofer <[email protected]>
  • Loading branch information
rgreinho committed Feb 11, 2024
1 parent f3d6f9c commit 6c98dc6
Show file tree
Hide file tree
Showing 9 changed files with 570 additions and 452 deletions.
6 changes: 3 additions & 3 deletions docs/database.dbml
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,15 @@ enum brokenspoke_state {
}

Table bna.brokenspoke_pipeline {
brokenspoke_pipeline_id int [pk, increment]
state_machine_id uuid [pk]
scheduled_trigger_id uuid [unique]
state brokenspoke_state
state_machine_id uuid
sqs_message json
neon_branch_id varchar(50)
fargate_task_id uuid
s3_bucket varchar(50)

indexes {
broken_spoke_pipeline_id
state_machine_id
}
}
28 changes: 27 additions & 1 deletion docs/database.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-- SQL dump generated using DBML (dbml-lang.org)
-- Database: PostgreSQL
-- Generated at: 2024-01-08T03:21:17.522Z
-- Generated at: 2024-02-11T19:57:43.906Z

CREATE SCHEMA "bna";

Expand All @@ -16,6 +16,20 @@ CREATE TYPE "approval_status" AS ENUM (
'rejected'
);

CREATE TYPE "brokenspoke_status" AS ENUM (
'pending',
'started',
'complete'
);

CREATE TYPE "brokenspoke_state" AS ENUM (
'pipeline',
'sqs_message',
'setup',
'analysis',
'export'
);

CREATE TABLE "bna"."summary" (
"bna_uuid" uuid PRIMARY KEY,
"city_id" uuid,
Expand Down Expand Up @@ -118,6 +132,16 @@ CREATE TABLE "bna"."submission" (
"status" approval_status
);

CREATE TABLE "bna"."brokenspoke_pipeline" (
"state_machine_id" uuid PRIMARY KEY,
"scheduled_trigger_id" uuid UNIQUE,
"state" brokenspoke_state,
"sqs_message" json,
"neon_branch_id" varchar(50),
"fargate_task_id" uuid,
"s3_bucket" varchar(50)
);

CREATE INDEX ON "bna"."summary" ("bna_uuid");

CREATE INDEX ON "bna"."summary" ("city_id");
Expand Down Expand Up @@ -152,6 +176,8 @@ CREATE INDEX ON "bna"."speed_limit" ("speed_limit_id");

CREATE INDEX ON "bna"."submission" ("submission_id");

CREATE INDEX ON "bna"."brokenspoke_pipeline" ("state_machine_id");

ALTER TABLE "bna"."summary" ADD FOREIGN KEY ("city_id") REFERENCES "bna"."city" ("city_id");

ALTER TABLE "bna"."census" ADD FOREIGN KEY ("city_id") REFERENCES "bna"."city" ("city_id");
Expand Down
915 changes: 500 additions & 415 deletions docs/database.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
7 changes: 4 additions & 3 deletions entity/src/entities/brokenspoke_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ use serde::{Deserialize, Serialize};
#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "brokenspoke_pipeline")]
pub struct Model {
#[sea_orm(primary_key)]
pub id: i32,
#[sea_orm(primary_key, auto_increment = false)]
pub state_machine_id: Uuid,
#[sea_orm(unique)]
pub scheduled_trigger_id: Option<Uuid>,
pub state: Option<BrokenspokeState>,
pub state_machine_id: Option<String>,
pub sqs_message: Option<Json>,
pub neon_branch_id: Option<String>,
pub fargate_task_id: Option<Uuid>,
Expand Down
16 changes: 9 additions & 7 deletions entity/src/wrappers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ impl IntoActiveModel<submission::ActiveModel> for SubmissionPatch {
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BrokenspokePipelinePost {
pub state: Option<sea_orm_active_enums::BrokenspokeState>,
pub state_machine_id: Option<String>,
pub state_machine_id: Uuid,
pub scheduled_trigger_id: Option<Uuid>,
pub sqs_message: Option<Json>,
pub neon_branch_id: Option<String>,
pub fargate_task_id: Option<Uuid>,
Expand All @@ -168,21 +169,22 @@ pub struct BrokenspokePipelinePost {
impl IntoActiveModel<brokenspoke_pipeline::ActiveModel> for BrokenspokePipelinePost {
fn into_active_model(self) -> brokenspoke_pipeline::ActiveModel {
brokenspoke_pipeline::ActiveModel {
id: ActiveValue::NotSet,
state: ActiveValue::Set(self.state),
state_machine_id: ActiveValue::Set(self.state_machine_id),
sqs_message: ActiveValue::Set(self.sqs_message),
neon_branch_id: ActiveValue::Set(self.neon_branch_id),
fargate_task_id: ActiveValue::Set(self.fargate_task_id),
s3_bucket: ActiveValue::Set(self.s3_bucket),
scheduled_trigger_id: ActiveValue::Set(self.scheduled_trigger_id),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct BrokenspokePipelinePatch {
pub state: Option<Option<sea_orm_active_enums::BrokenspokeState>>,
pub state_machine_id: Option<Option<String>>,
pub state_machine_id: Uuid,
pub scheduled_trigger_id: Option<Option<Uuid>>,
pub sqs_message: Option<Option<Json>>,
pub neon_branch_id: Option<Option<String>>,
pub fargate_task_id: Option<Option<Uuid>>,
Expand All @@ -192,11 +194,8 @@ pub struct BrokenspokePipelinePatch {
impl IntoActiveModel<brokenspoke_pipeline::ActiveModel> for BrokenspokePipelinePatch {
fn into_active_model(self) -> brokenspoke_pipeline::ActiveModel {
brokenspoke_pipeline::ActiveModel {
id: ActiveValue::NotSet,
state: self.state.map_or(ActiveValue::NotSet, ActiveValue::Set),
state_machine_id: self
.state_machine_id
.map_or(ActiveValue::NotSet, ActiveValue::Set),
state_machine_id: ActiveValue::Unchanged(self.state_machine_id),
sqs_message: self
.sqs_message
.map_or(ActiveValue::NotSet, ActiveValue::Set),
Expand All @@ -207,6 +206,9 @@ impl IntoActiveModel<brokenspoke_pipeline::ActiveModel> for BrokenspokePipelineP
.fargate_task_id
.map_or(ActiveValue::NotSet, ActiveValue::Set),
s3_bucket: self.s3_bucket.map_or(ActiveValue::NotSet, ActiveValue::Set),
scheduled_trigger_id: self
.scheduled_trigger_id
.map_or(ActiveValue::NotSet, ActiveValue::Set),
}
}
}
Expand Down
21 changes: 11 additions & 10 deletions lambdas/requests.rest
Original file line number Diff line number Diff line change
Expand Up @@ -136,30 +136,31 @@ Authorization: Bearer {{cognito_access}}
"status": "Accepted"
}

### Query the BNA analysis performed by the Brokenspoke-analyzer pipeline.
GET {{host}}/bnas/analysis
Authorization: Bearer {{cognito_access}}

### Query a specific BNA analysis performed by the Brokenspoke-analyzer pipeline.
GET {{host}}/bnas/analysis/1
Authorization: Bearer {{cognito_access}}

### Create a new BNA analysis performed by the Brokenspoke-analyzer pipeline.
POST {{host}}/bnas/analysis
content-type: application/json
Authorization: Bearer {{cognito_access}}

{
"state": "Analysis",
"state_machine_id": "166680c0-4e74-409b-b095-ae45ae458f0a",
"state_machine_id": "166680c0-4e74-409b-b095-ae45ae458f0",
"scheduled_trigger_id": "04ca18b9-6e0c-1aa5-2c3f-d4b445f840bc",
"sqs_message": "{\"city\": \"Valetta\", \"country\": \"Malta\"}",
"neon_branch_id": null,
"fargate_task_id": null,
"s3_bucket": null
}

### Query the BNA analysis performed by the Brokenspoke-analyzer pipeline.
GET {{host}}/bnas/analysis
Authorization: Bearer {{cognito_access}}

### Query a specific BNA analysis performed by the Brokenspoke-analyzer pipeline.
GET {{host}}/bnas/analysis/166680c0-4e74-409b-b095-ae45ae458f0
Authorization: Bearer {{cognito_access}}

### Update an existing BNA analysis performed by the Brokenspoke-analyzer pipeline.
PATCH {{host}}/bnas/analysis/1
PATCH {{host}}/bnas/analysis/166680c0-4e74-409b-b095-ae45ae458f0
content-type: application/json
Authorization: Bearer {{cognito_access}}

Expand Down
4 changes: 2 additions & 2 deletions lambdas/src/bnas-analysis/get-bnas-analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use effortless::{
use entity::prelude::*;
use lambda_http::{run, service_fn, Body, Error, IntoResponse, Request, Response};
use lambdas::{api_database_connect, build_paginated_response, pagination_parameters};
use sea_orm::{EntityTrait, PaginatorTrait};
use sea_orm::{prelude::Uuid, EntityTrait, PaginatorTrait};
use serde_json::json;
use tracing::debug;

Expand All @@ -22,7 +22,7 @@ async fn function_handler(event: Request) -> Result<Response<Body>, Error> {

// Retrieve the ID of the entry to update.
let parameter = "id";
let id = event.path_parameter::<i32>(parameter);
let id = event.path_parameter::<Uuid>(parameter);

// Set the database connection.
let db = match api_database_connect(&event).await {
Expand Down
10 changes: 5 additions & 5 deletions lambdas/src/bnas-analysis/patch-bnas-analysis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use effortless::{
use entity::{brokenspoke_pipeline::ActiveModel, prelude::*, wrappers};
use lambda_http::{run, service_fn, Body, Error, IntoResponse, Request, Response};
use lambdas::database_connect;
use sea_orm::{ActiveValue, EntityTrait, IntoActiveModel};
use sea_orm::{prelude::Uuid, ActiveValue, EntityTrait, IntoActiveModel};
use serde_json::json;
use tracing::info;

Expand All @@ -22,22 +22,22 @@ async fn function_handler(event: Request) -> Result<Response<Body>, Error> {

info!(
"updating Submission {:?} into database: {:?}",
active_model.id, active_model
active_model.state_machine_id, active_model
);

// Get the database connection.
let db = database_connect(Some("DATABASE_URL_SECRET_ID")).await?;

// Update the entry.
let res = BrokenspokePipeline::update(active_model).exec(&db).await?;
Ok(json!(res.id).into_response().await)
Ok(json!(res.state_machine_id).into_response().await)
}

pub fn prepare_active_model(event: &Request) -> Result<ActiveModel, APIErrors> {
// Retrieve the ID of the entry to update.
let parameter = "id";
let id = event
.path_parameter::<i32>(parameter)
.path_parameter::<Uuid>(parameter)
.ok_or(missing_parameter(event, parameter))?
.map_err(|e| invalid_path_parameter(event, parameter, e.to_string().as_str()))?;

Expand All @@ -46,7 +46,7 @@ pub fn prepare_active_model(event: &Request) -> Result<ActiveModel, APIErrors> {

// Turn the wrapper into an active model.
let mut active_model = wrapper.into_active_model();
active_model.id = ActiveValue::Unchanged(id);
active_model.state_machine_id = ActiveValue::Unchanged(id);
Ok(active_model)
}

Expand Down
15 changes: 9 additions & 6 deletions migration/src/m20240202_004130_brokenspoke_analyzer_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,20 @@ impl MigrationTrait for Migration {
.table(BrokenspokePipeline::Table)
.if_not_exists()
.col(
ColumnDef::new(BrokenspokePipeline::Id)
.integer()
ColumnDef::new(BrokenspokePipeline::StateMachineId)
.uuid()
.not_null()
.auto_increment()
.primary_key(),
)
.col(
ColumnDef::new(BrokenspokePipeline::ScheduledTriggerId)
.uuid()
.unique_key(),
)
.col(
ColumnDef::new(BrokenspokePipeline::State)
.enumeration(BrokenspokeState::Table, BrokenspokeState::iter().skip(1)),
)
.col(ColumnDef::new(BrokenspokePipeline::StateMachineId).string())
.col(ColumnDef::new(BrokenspokePipeline::SqsMessage).json())
.col(ColumnDef::new(BrokenspokePipeline::NeonBranchId).string())
.col(ColumnDef::new(BrokenspokePipeline::FargateTaskId).uuid())
Expand All @@ -63,9 +66,9 @@ impl MigrationTrait for Migration {
#[derive(DeriveIden)]
enum BrokenspokePipeline {
Table,
Id,
State,
StateMachineId,
ScheduledTriggerId,
State,
SqsMessage,
NeonBranchId,
FargateTaskId,
Expand Down

0 comments on commit 6c98dc6

Please sign in to comment.