From 927367a3e1701ec36c887d01894207ab18e9ba54 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 23 Oct 2023 16:02:57 +1100 Subject: [PATCH] docs: add documentation comments and update README.md. /cc @brainstorm. --- lib/workload/stateless/filemanager/README.md | 21 +++++++++++++++---- .../migrations/0001_add_object_table.sql | 3 ++- .../deploy/stack/filemanager_stack.ts | 3 +++ .../filemanager-http-lambda/src/main.rs | 3 ++- .../filemanager-ingest-lambda/src/main.rs | 2 +- .../filemanager/src/database/mod.rs | 4 ++++ .../filemanager/src/database/s3/ingester.rs | 3 +++ .../filemanager/filemanager/src/env.rs | 3 +++ .../filemanager/filemanager/src/error.rs | 1 + .../filemanager/src/events/s3/collect.rs | 3 +++ .../filemanager/src/events/s3/mod.rs | 4 +++- .../filemanager/src/events/s3/s3_client.rs | 5 +++++ .../filemanager/src/events/s3/sqs_client.rs | 6 +++++- 13 files changed, 52 insertions(+), 9 deletions(-) diff --git a/lib/workload/stateless/filemanager/README.md b/lib/workload/stateless/filemanager/README.md index b6fb0a4cf..0ae44d572 100644 --- a/lib/workload/stateless/filemanager/README.md +++ b/lib/workload/stateless/filemanager/README.md @@ -20,11 +20,24 @@ Start the postgres database and ensure that an `.env` file is set containing the docker compose up ``` -For development of the rust workspace: +The filemanager uses sqlx to check if queries succeed against a database at compile time. +A `.env` file ensures that the sqlx code can check queries at compile time by providing a `DATABASE_URL`. + +Filemanager uses docker to run a postgres database to track objects. This means that sqlx connects to the postgres server +running inside the docker compose container. If there are additional postgres installations locally (outside of docker), +this might interfere and complain about non-existing roles and users. + +For development of the rust workspace, build manually: + +```sh +cargo build --all-targets --all-features +``` + +Or watch and automatically recompile changes: ```sh -$ cargo install cargo-watch # if not installed previously -$ cargo watch -c -w src -x run +cargo install cargo-watch # if not installed previously +cargo watch -c ``` Test with: @@ -68,7 +81,7 @@ npx cdklocal deploy This allows creating events that are ingested. -First, push an object (in order to create a log group):q: +First, push an object (in order to create a log group): ```sh awslocal s3api put-object --bucket filemanager-test-ingest --key test ``` diff --git a/lib/workload/stateless/filemanager/database/migrations/0001_add_object_table.sql b/lib/workload/stateless/filemanager/database/migrations/0001_add_object_table.sql index 695b29999..51e7050be 100644 --- a/lib/workload/stateless/filemanager/database/migrations/0001_add_object_table.sql +++ b/lib/workload/stateless/filemanager/database/migrations/0001_add_object_table.sql @@ -17,5 +17,6 @@ create table object ( -- When this object was deleted, a null value means that the object has not yet been deleted. deleted_date timestamptz default null, -- The date of the object and its id combined. - portal_run_id varchar(255) not null + portal_run_id varchar(255) not null, + -- provenance - history of all objects and how they move? ); \ No newline at end of file diff --git a/lib/workload/stateless/filemanager/deploy/stack/filemanager_stack.ts b/lib/workload/stateless/filemanager/deploy/stack/filemanager_stack.ts index d659abcef..7003c430b 100644 --- a/lib/workload/stateless/filemanager/deploy/stack/filemanager_stack.ts +++ b/lib/workload/stateless/filemanager/deploy/stack/filemanager_stack.ts @@ -9,6 +9,9 @@ import * as sqs from 'aws-cdk-lib/aws-sqs'; import * as lambdaDestinations from 'aws-cdk-lib/aws-lambda-destinations'; import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources'; +/** + * Common settings for the filemanager stack. + */ interface Settings { database_url: string; endpoint_url: string; diff --git a/lib/workload/stateless/filemanager/filemanager-http-lambda/src/main.rs b/lib/workload/stateless/filemanager/filemanager-http-lambda/src/main.rs index 78d53589b..683bc2405 100644 --- a/lib/workload/stateless/filemanager/filemanager-http-lambda/src/main.rs +++ b/lib/workload/stateless/filemanager/filemanager-http-lambda/src/main.rs @@ -10,7 +10,8 @@ use filemanager::events::s3::collect::Collecter; use filemanager::events::s3::sqs_client::SQS; use filemanager::events::Collect; -/// Handle SQS events. +/// Handle SQS events by manually calling the SQS receive function. This is meant +/// to be run through something like API gateway to manually invoke ingestion. async fn event_handler(_: LambdaEvent<()>) -> Result<(), Error> { let sqs = SQS::with_default_client().await?; let events = sqs.receive().await?; diff --git a/lib/workload/stateless/filemanager/filemanager-ingest-lambda/src/main.rs b/lib/workload/stateless/filemanager/filemanager-ingest-lambda/src/main.rs index 4f8b3e755..9905c1ae6 100644 --- a/lib/workload/stateless/filemanager/filemanager-ingest-lambda/src/main.rs +++ b/lib/workload/stateless/filemanager/filemanager-ingest-lambda/src/main.rs @@ -11,7 +11,7 @@ use filemanager::events::s3::collect::Collecter; use filemanager::events::s3::FlatS3EventMessages; use filemanager::events::Collect; -/// Handle SQS events. +/// Handle SQS events that are passed directly to a lambda function. async fn event_handler(event: LambdaEvent) -> Result<(), Error> { trace!("received event: {:?}", event); diff --git a/lib/workload/stateless/filemanager/filemanager/src/database/mod.rs b/lib/workload/stateless/filemanager/filemanager/src/database/mod.rs index 88cbeeb14..0dfdaa696 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/database/mod.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/database/mod.rs @@ -7,16 +7,19 @@ use crate::events::EventType; pub mod s3; +/// A database client handles database interaction. #[derive(Debug)] pub struct DbClient { pool: PgPool, } impl DbClient { + /// Create a database from an existing pool. pub fn new(pool: PgPool) -> Self { Self { pool } } + /// Create a database with default DATABASE_URL connection. pub async fn new_with_defaults() -> Result { let url = std::env::var("DATABASE_URL").map_err(|err| DbClientError(err.to_string()))?; @@ -25,6 +28,7 @@ impl DbClient { }) } + /// Get the database pool. pub fn pool(&self) -> &PgPool { &self.pool } diff --git a/lib/workload/stateless/filemanager/filemanager/src/database/s3/ingester.rs b/lib/workload/stateless/filemanager/filemanager/src/database/s3/ingester.rs index 6531bea9a..6ecd2ac2b 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/database/s3/ingester.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/database/s3/ingester.rs @@ -16,16 +16,19 @@ pub struct Ingester { } impl Ingester { + /// Create a new ingester. pub fn new(db: DbClient) -> Self { Self { db } } + /// Create a new ingester with a default database client. pub async fn new_with_defaults() -> Result { Ok(Self { db: DbClient::new_with_defaults().await?, }) } + /// Ingest the events into the database by calling the insert and update queries. pub async fn ingest_s3_events(&mut self, events: Events) -> Result<()> { let Events { object_created, diff --git a/lib/workload/stateless/filemanager/filemanager/src/env.rs b/lib/workload/stateless/filemanager/filemanager/src/env.rs index 991b50e6e..4db75893b 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/env.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/env.rs @@ -6,9 +6,12 @@ use std::{ use dotenvy; use tracing::{error, info}; +/// Controls environment. #[derive(PartialEq)] pub enum AppEnv { + /// Dev environment loads config from dotenv. Dev, + /// Prod environment doesn't load local dotenv config. Prod, } diff --git a/lib/workload/stateless/filemanager/filemanager/src/error.rs b/lib/workload/stateless/filemanager/filemanager/src/error.rs index e9552337c..4de4199c1 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/error.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/error.rs @@ -5,6 +5,7 @@ use thiserror::Error; pub type Result = result::Result; +/// Error types for the filemanager. #[derive(Error, Debug)] pub enum Error { /// File not found by id. diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/s3/collect.rs b/lib/workload/stateless/filemanager/filemanager/src/events/s3/collect.rs index 116083fdf..75f03c269 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/s3/collect.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/s3/collect.rs @@ -5,6 +5,7 @@ use crate::events::s3::s3_client::S3; use crate::events::s3::{Events, FlatS3EventMessages}; use crate::events::{Collect, EventType}; +/// Collect raw events into the processed form which the database module accepts. #[derive(Debug)] pub struct Collecter { s3: S3, @@ -12,10 +13,12 @@ pub struct Collecter { } impl Collecter { + /// Create a new collector. pub fn new(s3: S3, raw_events: FlatS3EventMessages) -> Self { Self { s3, raw_events } } + /// Create a new collector with a default S3 client. pub async fn with_defaults(raw_events: FlatS3EventMessages) -> Result { Ok(Self { s3: S3::with_defaults().await?, diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/s3/mod.rs b/lib/workload/stateless/filemanager/filemanager/src/events/s3/mod.rs index 41ad2efca..09f3b635b 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/s3/mod.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/s3/mod.rs @@ -14,6 +14,7 @@ pub mod collect; pub mod s3_client; pub mod sqs_client; +/// A wrapper around AWS storage types with sqlx support. #[derive(Debug, Eq, PartialEq, sqlx::Type)] #[sqlx(type_name = "storage_class")] pub enum StorageClass { @@ -36,6 +37,7 @@ impl PgHasArrayType for StorageClass { } impl StorageClass { + /// Convert from the AWS storage class type to the filemanager storage class. pub fn from_aws(storage_class: AwsStorageClass) -> Option { match storage_class { AwsStorageClass::DeepArchive => Some(Self::DeepArchive), @@ -164,9 +166,9 @@ impl From for Events { } } +/// Flattened AWS S3 events #[derive(Debug, Deserialize, Eq, PartialEq, Default)] #[serde(try_from = "S3EventMessage")] -/// Flattened AWS S3 events pub struct FlatS3EventMessages(pub Vec); impl FlatS3EventMessages { diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/s3/s3_client.rs b/lib/workload/stateless/filemanager/filemanager/src/events/s3/s3_client.rs index caba8970e..8a6655baa 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/s3/s3_client.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/s3/s3_client.rs @@ -10,16 +10,19 @@ use crate::error::Error::S3Error; use crate::error::Result; use crate::events::s3::{FlatS3EventMessage, FlatS3EventMessages, StorageClass}; +/// A wrapper around an s3 client. #[derive(Debug)] pub struct S3 { s3_client: Client, } impl S3 { + /// Create a new S3 client wrapper. pub fn new(s3_client: Client) -> Self { Self { s3_client } } + /// Create with a default S3 client. pub async fn with_defaults() -> Result { let config = aws_config::from_env().load().await; let mut config = config::Builder::from(&config); @@ -68,6 +71,7 @@ impl S3 { } } + /// Converts an AWS datetime to a standard database format. pub fn convert_datetime( datetime: Option, ) -> Option> { @@ -79,6 +83,7 @@ impl S3 { } } + /// Process events and add header and datetime fields. pub async fn update_events(&self, events: FlatS3EventMessages) -> Result { Ok(FlatS3EventMessages( join_all(events.into_inner().into_iter().map(|mut event| async move { diff --git a/lib/workload/stateless/filemanager/filemanager/src/events/s3/sqs_client.rs b/lib/workload/stateless/filemanager/filemanager/src/events/s3/sqs_client.rs index f3e3f5416..04876d930 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/events/s3/sqs_client.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/events/s3/sqs_client.rs @@ -7,6 +7,7 @@ use crate::error::Error::{DbClientError, DeserializeError, SQSReceiveError}; use crate::error::Result; use crate::events::s3::FlatS3EventMessages; +/// A wrapper around an SQS client. #[derive(Debug)] pub struct SQS { sqs_client: Client, @@ -14,6 +15,7 @@ pub struct SQS { } impl SQS { + /// Create a new SQS client wrapper. pub fn new(sqs_client: Client, sqs_url: String) -> Self { Self { sqs_client, @@ -21,6 +23,8 @@ impl SQS { } } + + /// Create with a default SQS client. pub async fn with_default_client() -> Result { let config = aws_config::from_env().load().await; let mut config = config::Builder::from(&config); @@ -37,7 +41,7 @@ impl SQS { }) } - // TODO: Two possible event types, should be handled differently: PUT and DELETE + /// Manually call the receive function to retrieve events from the SQS queue. pub async fn receive(&self) -> Result { let rcv_message_output = self .sqs_client