From cfea52eb37930531981f83f93b8eb70c3fc50a3c Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Wed, 19 Jun 2024 13:46:29 +1000 Subject: [PATCH 01/12] feat(filemanager): create config struct for all environment variables --- .../stateless/stacks/filemanager/Cargo.lock | 1 + .../filemanager/filemanager-build/src/lib.rs | 2 + .../stacks/filemanager/filemanager/Cargo.toml | 3 +- .../stacks/filemanager/filemanager/src/env.rs | 80 +++++++++++++++++++ .../filemanager/filemanager/src/error.rs | 19 ++++- .../stacks/filemanager/filemanager/src/lib.rs | 8 +- 6 files changed, 101 insertions(+), 12 deletions(-) create mode 100644 lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs diff --git a/lib/workload/stateless/stacks/filemanager/Cargo.lock b/lib/workload/stateless/stacks/filemanager/Cargo.lock index f184cd264..e114b5430 100644 --- a/lib/workload/stateless/stacks/filemanager/Cargo.lock +++ b/lib/workload/stateless/stacks/filemanager/Cargo.lock @@ -1840,6 +1840,7 @@ dependencies = [ "chrono", "csv", "dotenvy", + "envy", "filemanager", "filemanager-build", "flate2", diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs index 7b7483a4b..e56165a33 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs @@ -9,6 +9,7 @@ use std::path::{Path, PathBuf}; pub mod entities; pub mod error; +/// Configuration environment variables for the build process. #[derive(Debug, Deserialize)] pub struct Config { database_url: String, @@ -16,6 +17,7 @@ pub struct Config { } impl Config { + /// Load environment variables into a `Config` struct. #[track_caller] pub fn load() -> Result { Ok(from_env::()?) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml index 91e086273..56c8e925f 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml +++ b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml @@ -23,7 +23,8 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls", "chrono", "uuid"] } -sea-orm = { version = "0.12", features = ["sqlx-postgres", "runtime-tokio-rustls"] } +sea-orm = { version = "0.12", features = ["sqlx-postgres", "runtime-tokio-rustls", "macros"] } +envy = "0.4" chrono = { version = "0.4", features = ["serde"] } thiserror = "1" uuid = { version = "1", features = ["v7"] } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs new file mode 100644 index 000000000..f04225f94 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs @@ -0,0 +1,80 @@ +//! Handles loading environment variables as config options for filemanager. +//! + +use std::result; +use envy::from_env; +use serde::{Deserialize, Deserializer}; +use serde::de::Error; +use crate::error::Result; + +/// Configuration environment variables for filemanager. +#[derive(Debug, Deserialize)] +pub struct Config { + database_url: Option, + pgpassword: Option, + pghost: Option, + pgport: Option, + pguser: Option, + sqs_queue_url: Option, + #[serde(deserialize_with = "deserialize_bool_with_num")] + paired_ingest_mode: bool, +} + +fn deserialize_bool_with_num<'de, D>(deserializer: D) -> result::Result + where + D: Deserializer<'de>, +{ + let value: Option = Deserialize::deserialize(deserializer)?; + + Ok(value.map(|value| { + if value == "1" { + Ok(true) + } else if value == "0" { + Ok(false) + } else { + value.parse::() + } + }).transpose().map_err(|err| Error::custom(err))?.unwrap_or_default()) +} + +impl Config { + /// Load environment variables into a `Config` struct. + pub fn load() -> Result { + Ok(from_env::()?) + } + + /// Get the database url. + pub fn database_url(&self) -> Option<&str> { + self.database_url.as_deref() + } + + /// Get the pg password. + pub fn pg_password(&self) -> Option<&str> { + self.pgpassword.as_deref() + } + + /// Get the pg host. + pub fn pg_host(&self) -> Option<&str> { + self.pghost.as_deref() + } + + /// Get the pg port. + pub fn pg_port(&self) -> Option<&str> { + self.pgport.as_deref() + } + + /// Get the pg user. + pub fn pg_user(&self) -> Option<&str> { + self.pguser.as_deref() + } + + /// Get the SQS url. + pub fn sqs_queue_url(&self) -> Option<&str> { + self.sqs_queue_url.as_deref() + } + + /// Get the paired ingest mode. + pub fn paired_ingest_mode(&self) -> bool { + self.paired_ingest_mode + } +} \ No newline at end of file diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs index 46c774ef3..0b4245d09 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs @@ -2,6 +2,7 @@ //! use std::result; +use sea_orm::DbErr; use sqlx::migrate::MigrateError; use thiserror::Error; @@ -19,10 +20,8 @@ pub enum Error { SQSError(String), #[error("deserialization error: `{0}`")] DeserializeError(String), - #[error("Missing environment variable: `{0}`")] - MissingEnvironmentVariable(String), - #[error("Invalid environment variable: `{0}`")] - InvalidEnvironmentVariable(String), + #[error("Loading environment variables: `{0}`")] + LoadingEnvironment(String), #[error("credential generator error: `{0}`")] CredentialGeneratorError(String), #[error("S3 inventory error: `{0}`")] @@ -35,6 +34,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: DbErr) -> Self { + Self::SQLError(err.to_string()) + } +} + impl From for Error { fn from(err: MigrateError) -> Self { Self::SQLError(err.to_string()) @@ -46,3 +51,9 @@ impl From for Error { Self::DeserializeError(err.to_string()) } } + +impl From for Error { + fn from(error: envy::Error) -> Self { + Self::LoadingEnvironment(error.to_string()) + } +} \ No newline at end of file diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs index 423cffb48..03d6f34c2 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs @@ -2,17 +2,11 @@ //! logic. //! -use crate::error::Error::MissingEnvironmentVariable; -use crate::error::Result; - pub mod clients; pub mod database; pub mod error; pub mod events; pub mod handlers; pub mod uuid; +pub mod env; -/// Read an environment variable into a string. -pub fn read_env>(key: K) -> Result { - std::env::var(key.as_ref()).map_err(|err| MissingEnvironmentVariable(err.to_string())) -} From 4138ba5bf623efdca653ac4071c77c39d646d058 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Thu, 20 Jun 2024 15:11:17 +1000 Subject: [PATCH 02/12] refactor(filemanager): use sea-orm database connection --- .../filemanager-build/src/entities.rs | 4 +- .../filemanager-http-lambda/src/main.rs | 7 +- .../filemanager-ingest-lambda/src/main.rs | 7 +- .../filemanager-inventory-lambda/src/main.rs | 9 +- .../filemanager-migrate-lambda/src/main.rs | 6 +- .../stacks/filemanager/filemanager/Cargo.toml | 2 +- .../src/database/aws/credentials.rs | 45 ++++------ .../filemanager/src/database/aws/ingester.rs | 12 +-- .../src/database/aws/ingester_paired.rs | 12 +-- .../filemanager/src/database/aws/migration.rs | 12 +-- .../filemanager/src/database/aws/query.rs | 2 +- .../filemanager/src/database/mod.rs | 65 ++++++++++---- .../stacks/filemanager/filemanager/src/env.rs | 77 +++++++++------- .../filemanager/filemanager/src/error.rs | 4 +- .../filemanager/src/events/aws/collecter.rs | 83 ++++++++--------- .../filemanager/src/handlers/aws.rs | 90 +++++++++++++------ .../stacks/filemanager/filemanager/src/lib.rs | 3 +- 17 files changed, 260 insertions(+), 180 deletions(-) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs index 64c5ff555..04235b68f 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs @@ -6,10 +6,10 @@ use crate::error::ErrorKind::EntityGeneration; use crate::error::{Error, Result}; use crate::Config; use clap_builder::Parser; +use quote::quote; use sea_orm_cli::{run_generate_command, Cli, Commands}; use std::ffi::OsStr; use std::fs::write; -use quote::quote; pub async fn generate_entities() -> Result<()> { let config = Config::load()?; @@ -43,6 +43,6 @@ pub async fn generate_entities() -> Result<()> { ); write(out_dir.join("entities.rs"), generated.to_string())?; - + Ok(()) } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs index 39c403091..9a97abc6d 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs @@ -1,3 +1,4 @@ +use filemanager::env::Config; use lambda_http::Error; use lambda_runtime::{run, service_fn, LambdaEvent}; @@ -11,15 +12,17 @@ use filemanager::handlers::init_tracing; async fn main() -> Result<(), Error> { init_tracing(); - let options = &create_database_pool().await?; + let config = &Config::load()?; + let options = &create_database_pool(config).await?; run(service_fn(|_: LambdaEvent<()>| async move { - update_credentials(options).await?; + update_credentials(options, config).await?; receive_and_ingest( S3Client::with_defaults().await, SQSClient::with_defaults().await, None::, DbClient::from_ref(options), + config, ) .await?; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs index e686f5cf9..ada051b7b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs @@ -3,6 +3,7 @@ use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use filemanager::clients::aws::s3::Client; use filemanager::database::Client as DbClient; +use filemanager::env::Config; use filemanager::handlers::aws::{create_database_pool, ingest_event, update_credentials}; use filemanager::handlers::init_tracing; @@ -10,14 +11,16 @@ use filemanager::handlers::init_tracing; async fn main() -> Result<(), Error> { init_tracing(); - let options = &create_database_pool().await?; + let config = &Config::load()?; + let options = &create_database_pool(config).await?; run(service_fn(|event: LambdaEvent| async move { - update_credentials(options).await?; + update_credentials(options, config).await?; ingest_event( event.payload, Client::with_defaults().await, DbClient::from_ref(options), + config, ) .await?; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs index 2caa274b0..9c1b6cf42 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs @@ -3,6 +3,7 @@ use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use serde::Deserialize; use filemanager::database::Client as DbClient; +use filemanager::env::Config; use filemanager::events::aws::inventory::Manifest; use filemanager::handlers::aws::{create_database_pool, ingest_s3_inventory, update_credentials}; use filemanager::handlers::init_tracing; @@ -26,9 +27,10 @@ pub struct BucketKey { async fn main() -> Result<(), Error> { init_tracing(); - let options = &create_database_pool().await?; + let config = &Config::load()?; + let options = &create_database_pool(config).await?; run(service_fn(|event: LambdaEvent| async move { - update_credentials(options).await?; + update_credentials(options, config).await?; let client = Client::with_defaults().await; let database = DbClient::from_ref(options); @@ -41,11 +43,12 @@ async fn main() -> Result<(), Error> { Some(bucket_key.bucket), Some(bucket_key.key), None, + config, ) .await? } Request::Manifest(manifest) => { - ingest_s3_inventory(client, database, None, None, Some(manifest)).await? + ingest_s3_inventory(client, database, None, None, Some(manifest), config).await? } }; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs index 4eef907f4..a412827a3 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs @@ -7,6 +7,7 @@ use crate::Event::Provider; use filemanager::database::aws::migration::Migration; use filemanager::database::Client as DbClient; use filemanager::database::Migrate; +use filemanager::env::Config; use filemanager::handlers::aws::{create_database_pool, update_credentials}; use filemanager::handlers::init_tracing; @@ -32,9 +33,10 @@ pub enum CloudFormationRequest { async fn main() -> Result<(), Error> { init_tracing(); - let options = &create_database_pool().await?; + let config = &Config::load()?; + let options = &create_database_pool(config).await?; run(service_fn(|event: LambdaEvent| async move { - update_credentials(options).await?; + update_credentials(options, config).await?; // Migrate depending on the type of lifecycle event using the CDK provider framework: // https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.custom_resources-readme.html#provider-framework diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml index 56c8e925f..169c32435 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml +++ b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml @@ -23,7 +23,7 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls", "chrono", "uuid"] } -sea-orm = { version = "0.12", features = ["sqlx-postgres", "runtime-tokio-rustls", "macros"] } +sea-orm = { version = "0.12", features = ["sqlx-postgres", "runtime-tokio-rustls", "macros", "sea-orm-internal"] } envy = "0.4" chrono = { version = "0.4", features = ["serde"] } thiserror = "1" diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs index f45480768..00812bff4 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs @@ -14,9 +14,9 @@ use url::Url; #[double] use crate::clients::aws::config::Config; use crate::database::CredentialGenerator; +use crate::env::Config as EnvConfig; use crate::error::Error::CredentialGeneratorError; use crate::error::Result; -use crate::read_env; /// Number of seconds that the IAM credentials expire in. /// Equals the max Lambda timeout. @@ -57,32 +57,18 @@ impl IamGeneratorBuilder { } /// Build the credential generator. - pub async fn build(self) -> Result { + pub async fn build(self, env_config: &EnvConfig) -> Result { let config = if let Some(config) = self.config { config } else { Config::with_defaults().await }; - let host = if let Some(host) = self.host { - host - } else { - read_env("PGHOST")? - }; - - let port = if let Some(port) = self.port { - port - } else { - read_env("PGPORT")?.parse().map_err(|_| { - CredentialGeneratorError("failed to parse port from PGPORT".to_string()) - })? - }; - - let user = if let Some(user) = self.user { - user - } else { - read_env("PGUSER")? - }; + let host = + EnvConfig::value_or_else(self.host.as_deref(), env_config.pg_host())?.to_string(); + let port = EnvConfig::value_or_else(self.port, env_config.pg_port())?; + let user = + EnvConfig::value_or_else(self.user.as_deref(), env_config.pg_user())?.to_string(); Ok(IamGenerator::new(config, host, port, user)) } @@ -192,7 +178,6 @@ impl CredentialGenerator for IamGenerator { mod tests { use std::borrow::Cow; use std::collections::HashMap; - use std::env::set_var; use std::future::Future; use aws_credential_types::Credentials; @@ -209,7 +194,7 @@ mod tests { .with_host("127.0.0.1".to_string()) .with_port(5432) .with_user("filemanager".to_string()) - .build() + .build(&Default::default()) .await .unwrap() }) @@ -218,14 +203,20 @@ mod tests { #[tokio::test] async fn generate_iam_token_env() { - set_var("PGHOST", "127.0.0.1"); - set_var("PGPORT", "5432"); - set_var("PGUSER", "filemanager"); + let env_config = EnvConfig { + database_url: None, + pgpassword: None, + pghost: Some("127.0.0.1".to_string()), + pgport: Some(5432), + pguser: Some("filemanager".to_string()), + sqs_queue_url: None, + paired_ingest_mode: false, + }; test_generate_iam_token(|config| async { IamGeneratorBuilder::default() .with_config(config) - .build() + .build(&env_config) .await .unwrap() }) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs index d67a5f141..57a37d9c7 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs @@ -7,6 +7,7 @@ use tracing::debug; use uuid::Uuid; use crate::database::{Client, CredentialGenerator}; +use crate::env::Config; use crate::error::Result; use crate::events::aws::message::EventType; use crate::events::aws::{StorageClass, TransposedS3EventMessages}; @@ -31,10 +32,11 @@ impl<'a> Ingester<'a> { } /// Create a new ingester with a default database client. - pub async fn with_defaults(generator: Option) -> Result { - Ok(Self { - client: Client::from_generator(generator).await?, - }) + pub async fn with_defaults( + generator: Option, + config: &Config, + ) -> Result { + Ok(Self::new(Client::from_generator(generator, config).await?)) } /// Reprocess inserts to find object ids that are not duplicates from the inserted events. @@ -1528,6 +1530,6 @@ pub(crate) mod tests { } pub(crate) fn test_ingester<'a>(pool: PgPool) -> Client<'a> { - Client::new(pool) + Client::from_pool(pool) } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs index 58d44d9d4..f491e9919 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs @@ -8,6 +8,7 @@ use tracing::{debug, trace}; use uuid::Uuid; use crate::database::{Client, CredentialGenerator}; +use crate::env::Config; use crate::error::Result; use crate::events::aws::inventory::Inventory; use crate::events::aws::message::EventType; @@ -36,10 +37,11 @@ impl<'a> IngesterPaired<'a> { } /// Create a new ingester with a default database client. - pub async fn with_defaults(generator: Option) -> Result { - Ok(Self { - client: Client::from_generator(generator).await?, - }) + pub async fn with_defaults( + generator: Option, + config: &Config, + ) -> Result { + Ok(Self::new(Client::from_generator(generator, config).await?)) } fn reprocess_updated( @@ -1808,6 +1810,6 @@ pub(crate) mod tests { } pub(crate) fn test_ingester<'a>(pool: PgPool) -> Client<'a> { - Client::new(pool) + Client::from_pool(pool) } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs index 04843d483..4228c292c 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs @@ -7,6 +7,7 @@ use sqlx::migrate::Migrator; use tracing::trace; use crate::database::{Client, CredentialGenerator, Migrate}; +use crate::env::Config; use crate::error::Error::MigrateError; use crate::error::Result; @@ -23,10 +24,11 @@ impl<'a> Migration<'a> { } /// Create a new migration with a default database client. - pub async fn with_defaults(generator: Option) -> Result { - Ok(Self { - client: Client::from_generator(generator).await?, - }) + pub async fn with_defaults( + generator: Option, + config: &Config, + ) -> Result { + Ok(Self::new(Client::from_generator(generator, config).await?)) } /// Get the underlying sqlx migrator for the migrations. @@ -64,7 +66,7 @@ pub(crate) mod tests { #[sqlx::test(migrations = false)] async fn test_migrate(pool: PgPool) { - let migrate = Migration::new(Client::new(pool)); + let migrate = Migration::new(Client::from_pool(pool)); let not_exists = sqlx::query!( "select exists (select from information_schema.tables where table_name = 'object')" diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs index 53010daab..be55e7f8c 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs @@ -95,7 +95,7 @@ mod tests { #[sqlx::test(migrator = "MIGRATOR")] async fn test_select_existing_by_bucket_key(pool: PgPool) { let ingester = test_ingester(pool.clone()); - let query = Query::new(Client::new(pool)); + let query = Query::new(Client::from_pool(pool)); let events = test_events(Some(Created)); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs index 28c4e8321..c861bcb7b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs @@ -5,14 +5,15 @@ use std::borrow::Cow; use crate::database::aws::ingester::Ingester; use crate::database::aws::ingester_paired::IngesterPaired; +use crate::env::Config; use async_trait::async_trait; +use sea_orm::{ConnectOptions, Database, DatabaseConnection, SqlxPostgresConnector}; use sqlx::postgres::PgConnectOptions; -use sqlx::PgPool; +use sqlx::{ConnectOptions as SqlxConnectOptions, PgPool}; use tracing::debug; use crate::error::Result; use crate::events::EventSourceType; -use crate::read_env; pub mod aws; @@ -30,34 +31,57 @@ pub trait CredentialGenerator { #[derive(Debug, Clone)] pub struct Client<'a> { // Use a Cow here to allow an owned pool or a shared reference to a pool. - pool: Cow<'a, PgPool>, + connection: Cow<'a, DatabaseConnection>, } impl<'a> Client<'a> { /// Create a database from an existing pool. - pub fn new(pool: PgPool) -> Self { + pub fn new(connection: DatabaseConnection) -> Self { Self { - pool: Cow::Owned(pool), + connection: Cow::Owned(connection), } } /// Create a database from a reference to an existing pool. - pub fn from_ref(pool: &'a PgPool) -> Self { + pub fn from_ref(connection: &'a DatabaseConnection) -> Self { Self { - pool: Cow::Borrowed(pool), + connection: Cow::Borrowed(connection), } } + /// Create a database connection from an existing pool. + pub fn from_pool(pool: PgPool) -> Self { + Self::new(SqlxPostgresConnector::from_sqlx_postgres_pool(pool)) + } + /// Create a database using default credential loading logic as defined in /// `Self::connect_options`. - pub async fn from_generator(generator: Option) -> Result { - Ok(Self::new(Self::create_pool(generator).await?)) + pub async fn from_generator( + generator: Option, + config: &Config, + ) -> Result { + Ok(Self::new(Self::create_pool(generator, config).await?)) } /// Create a database connection pool using credential loading logic defined in /// `Self::connect_options`. - pub async fn create_pool(generator: Option) -> Result { - Ok(PgPool::connect_with(Self::connect_options(generator).await?).await?) + pub async fn create_pool( + generator: Option, + config: &Config, + ) -> Result { + Ok(Database::connect(Self::connect_options(generator, config).await?).await?) + } + + /// Create database connect options using a series of credential loading logic. + pub async fn connect_options( + generator: Option, + config: &Config, + ) -> Result { + Ok(ConnectOptions::new( + Self::pg_connect_options(generator, config) + .await? + .to_url_lossy(), + )) } /// Create database connect options using a series of credential loading logic. @@ -65,15 +89,17 @@ impl<'a> Client<'a> { /// First, this tries to load a DATABASE_URL environment variable to connect. /// Then, it uses the generator if it is not None and PGPASSWORD is not set. /// Otherwise, uses default logic defined in PgConnectOptions::default. - pub async fn connect_options( + pub async fn pg_connect_options( generator: Option, + config: &Config, ) -> Result { // If the DATABASE_URL is defined, use that. - if let Ok(url) = read_env("DATABASE_URL") { + if let Some(url) = config.database_url() { return Ok(url.parse()?); } + // If PGPASSWORD is set, use default options. - if read_env("PGPASSWORD").is_ok() { + if config.pg_password().is_some() { return Ok(PgConnectOptions::default()); } @@ -89,7 +115,12 @@ impl<'a> Client<'a> { /// Get the database pool. pub fn pool(&self) -> &PgPool { - &self.pool + self.connection.get_postgres_connection_pool() + } + + /// Get the database connection. + pub fn connection(&self) -> &DatabaseConnection { + &self.connection } } @@ -98,12 +129,12 @@ impl<'a> Ingest<'a> for Client<'a> { async fn ingest(&'a self, events: EventSourceType) -> Result<()> { match events { EventSourceType::S3(events) => { - Ingester::new(Self::from_ref(self.pool())) + Ingester::new(Self::from_ref(self.connection())) .ingest_events(events) .await } EventSourceType::S3Paired(events) => { - IngesterPaired::new(Self::from_ref(self.pool())) + IngesterPaired::new(Self::from_ref(self.connection())) .ingest_events(events) .await } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs index f04225f94..e3fe32d2c 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs @@ -1,40 +1,45 @@ //! Handles loading environment variables as config options for filemanager. //! -use std::result; +use crate::error::Error::LoadingEnvironment; +use crate::error::Result; use envy::from_env; -use serde::{Deserialize, Deserializer}; use serde::de::Error; -use crate::error::Result; +use serde::{Deserialize, Deserializer}; +use std::result; /// Configuration environment variables for filemanager. -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Default)] pub struct Config { - database_url: Option, - pgpassword: Option, - pghost: Option, - pgport: Option, - pguser: Option, - sqs_queue_url: Option, + pub(crate) database_url: Option, + pub(crate) pgpassword: Option, + pub(crate) pghost: Option, + pub(crate) pgport: Option, + pub(crate) pguser: Option, + pub(crate) sqs_queue_url: Option, #[serde(deserialize_with = "deserialize_bool_with_num")] - paired_ingest_mode: bool, + pub(crate) paired_ingest_mode: bool, } fn deserialize_bool_with_num<'de, D>(deserializer: D) -> result::Result - where - D: Deserializer<'de>, +where + D: Deserializer<'de>, { let value: Option = Deserialize::deserialize(deserializer)?; - - Ok(value.map(|value| { - if value == "1" { - Ok(true) - } else if value == "0" { - Ok(false) - } else { - value.parse::() - } - }).transpose().map_err(|err| Error::custom(err))?.unwrap_or_default()) + + Ok(value + .map(|value| { + if value == "1" { + Ok(true) + } else if value == "0" { + Ok(false) + } else { + value.parse::() + } + }) + .transpose() + .map_err(Error::custom)? + .unwrap_or_default()) } impl Config { @@ -42,7 +47,7 @@ impl Config { pub fn load() -> Result { Ok(from_env::()?) } - + /// Get the database url. pub fn database_url(&self) -> Option<&str> { self.database_url.as_deref() @@ -54,22 +59,22 @@ impl Config { } /// Get the pg host. - pub fn pg_host(&self) -> Option<&str> { + pub fn pg_host(&self) -> Option<&str> { self.pghost.as_deref() } /// Get the pg port. - pub fn pg_port(&self) -> Option<&str> { - self.pgport.as_deref() + pub fn pg_port(&self) -> Option { + self.pgport } /// Get the pg user. - pub fn pg_user(&self) -> Option<&str> { + pub fn pg_user(&self) -> Option<&str> { self.pguser.as_deref() } /// Get the SQS url. - pub fn sqs_queue_url(&self) -> Option<&str> { + pub fn sqs_queue_url(&self) -> Option<&str> { self.sqs_queue_url.as_deref() } @@ -77,4 +82,16 @@ impl Config { pub fn paired_ingest_mode(&self) -> bool { self.paired_ingest_mode } -} \ No newline at end of file + + /// Get the value from an optional, or else try and get a different value, unwrapping into a Result. + pub fn value_or_else(value: Option, or_else: Option) -> Result { + value + .map(Ok) + .unwrap_or_else(|| Self::value_into_err(or_else)) + } + + /// Convert an optional value to a missing environment variable error. + pub fn value_into_err(value: Option) -> Result { + value.ok_or_else(|| LoadingEnvironment("missing environment variable".to_string())) + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs index 0b4245d09..dd41f96c6 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs @@ -1,8 +1,8 @@ //! Errors used by the filemanager crate. //! -use std::result; use sea_orm::DbErr; +use std::result; use sqlx::migrate::MigrateError; use thiserror::Error; @@ -56,4 +56,4 @@ impl From for Error { fn from(error: envy::Error) -> Self { Self::LoadingEnvironment(error.to_string()) } -} \ No newline at end of file +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs index 0993e4b67..bf76d76da 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs @@ -17,14 +17,13 @@ use crate::clients::aws::s3::Client; use crate::clients::aws::s3::Client as S3Client; #[double] use crate::clients::aws::sqs::Client as SQSClient; -use crate::error::Error::InvalidEnvironmentVariable; +use crate::env::Config; use crate::error::Error::{DeserializeError, SQSError}; use crate::error::Result; use crate::events::aws::{ EventType, FlatS3EventMessage, FlatS3EventMessages, StorageClass, TransposedS3EventMessages, }; use crate::events::{Collect, EventSourceType}; -use crate::read_env; /// Build an AWS collector struct. #[derive(Default, Debug)] @@ -59,11 +58,11 @@ impl CollecterBuilder { } /// Build a collector using the raw events. - pub async fn build(self, raw_events: FlatS3EventMessages) -> Collecter { + pub async fn build(self, raw_events: FlatS3EventMessages, config: &Config) -> Collecter<'_> { if let Some(s3_client) = self.s3_client { - Collecter::new(s3_client, raw_events) + Collecter::new(s3_client, raw_events, config) } else { - Collecter::new(S3Client::with_defaults().await, raw_events) + Collecter::new(S3Client::with_defaults().await, raw_events, config) } } @@ -96,20 +95,21 @@ impl CollecterBuilder { } /// Build a collector by manually calling receive to obtain the raw events. - pub async fn build_receive(mut self) -> Result { + pub async fn build_receive(mut self, config: &Config) -> Result { let url = self.sqs_url.take(); - let url = if let Some(url) = url { - url - } else { - read_env("SQS_QUEUE_URL")? - }; + let url = Config::value_or_else(url.as_deref(), config.sqs_queue_url())?; let client = self.sqs_client.take(); if let Some(sqs_client) = &client { - Ok(self.build(Self::receive(sqs_client, &url).await?).await) + Ok(self + .build(Self::receive(sqs_client, url).await?, config) + .await) } else { Ok(self - .build(Self::receive(&SQSClient::with_defaults().await, &url).await?) + .build( + Self::receive(&SQSClient::with_defaults().await, url).await?, + config, + ) .await) } } @@ -117,20 +117,25 @@ impl CollecterBuilder { /// Collect raw events into the processed form which the database module accepts. #[derive(Debug)] -pub struct Collecter { +pub struct Collecter<'a> { client: Client, raw_events: FlatS3EventMessages, + config: &'a Config, } -impl Collecter { +impl<'a> Collecter<'a> { /// Create a new collector. - pub(crate) fn new(client: Client, raw_events: FlatS3EventMessages) -> Self { - Self { client, raw_events } + pub(crate) fn new(client: Client, raw_events: FlatS3EventMessages, config: &'a Config) -> Self { + Self { + client, + raw_events, + config, + } } /// Get the inner values. - pub fn into_inner(self) -> (Client, FlatS3EventMessages) { - (self.client, self.raw_events) + pub fn into_inner(self) -> (Client, FlatS3EventMessages, &'a Config) { + (self.client, self.raw_events, self.config) } /// Converts an AWS datetime to a standard database format. @@ -204,35 +209,19 @@ impl Collecter { .collect::>>()?, )) } - - /// Read the whether paired ingest mode should be used. - pub fn paired_ingest_mode() -> Result { - Ok(read_env("PAIRED_INGEST_MODE") - .ok() - .map(|value| { - if value == "1" { - Ok(true) - } else { - value.parse::() - } - }) - .transpose() - .map_err(|_| InvalidEnvironmentVariable("ingest paired mode".to_string()))? - .unwrap_or_default()) - } } #[async_trait] -impl Collect for Collecter { +impl<'a> Collect for Collecter<'a> { async fn collect(self) -> Result { - let (client, events) = self.into_inner(); + let (client, events, config) = self.into_inner(); let events = events.sort_and_dedup(); let events = Self::update_events(&client, events).await?; // Get only the known event types. let events = events.filter_known(); - if Self::paired_ingest_mode()? { + if config.paired_ingest_mode() { Ok(EventSourceType::S3Paired(events.into())) } else { Ok(EventSourceType::S3(TransposedS3EventMessages::from(events))) @@ -297,7 +286,7 @@ pub(crate) mod tests { .with_sqs_client(sqs_client) .with_s3_client(s3_client) .with_sqs_url("url") - .build_receive() + .build_receive(&Default::default()) .await .unwrap() .collect() @@ -319,7 +308,8 @@ pub(crate) mod tests { #[tokio::test] async fn head() { - let mut collecter = test_collecter().await; + let config = Default::default(); + let mut collecter = test_collecter(&config).await; set_s3_client_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); @@ -329,7 +319,8 @@ pub(crate) mod tests { #[tokio::test] async fn head_not_found() { - let mut collecter = test_collecter().await; + let config = Default::default(); + let mut collecter = test_collecter(&config).await; set_s3_client_expectations( &mut collecter.client, @@ -342,7 +333,8 @@ pub(crate) mod tests { #[tokio::test] async fn update_events() { - let mut collecter = test_collecter().await; + let config = Default::default(); + let mut collecter = test_collecter(&config).await; let events = expected_flat_events_simple().sort_and_dedup(); @@ -365,7 +357,8 @@ pub(crate) mod tests { #[tokio::test] async fn collect() { - let mut collecter = test_collecter().await; + let config = Default::default(); + let mut collecter = test_collecter(&config).await; set_s3_client_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); @@ -429,8 +422,8 @@ pub(crate) mod tests { ) } - async fn test_collecter() -> Collecter { - Collecter::new(Client::default(), expected_flat_events_simple()) + async fn test_collecter(config: &Config) -> Collecter<'_> { + Collecter::new(Client::default(), expected_flat_events_simple(), config) } fn expected_receive_message() -> ReceiveMessageOutput { diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs index 7fe04371a..05ea1e430 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs @@ -7,7 +7,7 @@ use aws_lambda_events::sqs::SqsEvent; use itertools::Itertools; use lambda_runtime::Error; use mockall_double::double; -use sqlx::PgPool; +use sea_orm::DatabaseConnection; use tracing::{debug, trace}; #[double] @@ -17,7 +17,8 @@ use crate::clients::aws::sqs::Client as SQSClient; use crate::database::aws::credentials::IamGeneratorBuilder; use crate::database::aws::query::Query; use crate::database::{Client, Ingest}; -use crate::events::aws::collecter::{Collecter, CollecterBuilder}; +use crate::env::Config as EnvConfig; +use crate::events::aws::collecter::CollecterBuilder; use crate::events::aws::inventory::{DiffMessages, Inventory, Manifest}; use crate::events::aws::message::EventType::Created; use crate::events::aws::{FlatS3EventMessages, TransposedS3EventMessages}; @@ -25,17 +26,18 @@ use crate::events::{Collect, EventSourceType}; /// Handle SQS events by manually calling the SQS receive function. This is meant /// to be run through something like API gateway to manually invoke ingestion. -pub async fn receive_and_ingest( +pub async fn receive_and_ingest<'a>( s3_client: S3Client, sqs_client: SQSClient, sqs_url: Option>, - database_client: Client<'_>, -) -> Result { + database_client: Client<'a>, + env_config: &EnvConfig, +) -> Result, Error> { let events = CollecterBuilder::default() .with_s3_client(s3_client) .with_sqs_client(sqs_client) .set_sqs_url(sqs_url) - .build_receive() + .build_receive(env_config) .await? .collect() .await?; @@ -45,11 +47,12 @@ pub async fn receive_and_ingest( } /// Handle SQS events that go through an SqsEvent. -pub async fn ingest_event( +pub async fn ingest_event<'a>( event: SqsEvent, s3_client: S3Client, - database_client: Client<'_>, -) -> Result { + database_client: Client<'a>, + env_config: &EnvConfig, +) -> Result, Error> { trace!("received event: {:?}", event); let events: FlatS3EventMessages = event @@ -68,7 +71,7 @@ pub async fn ingest_event( let events = CollecterBuilder::default() .with_s3_client(s3_client) - .build(events) + .build(events, env_config) .await .collect() .await?; @@ -80,14 +83,15 @@ pub async fn ingest_event( } /// Handle an S3 inventory for ingestion. -pub async fn ingest_s3_inventory( +pub async fn ingest_s3_inventory<'a>( s3_client: S3Client, - database_client: Client<'_>, + database_client: Client<'a>, bucket: Option, key: Option, manifest: Option, -) -> Result { - if Collecter::paired_ingest_mode()? { + env_config: &EnvConfig, +) -> Result, Error> { + if env_config.paired_ingest_mode() { return Err(Error::from( "paired ingest mode is not supported for S3 inventory".to_string(), )); @@ -164,16 +168,29 @@ pub async fn ingest_s3_inventory( } /// Create a postgres database pool using an IAM credential generator. -pub async fn create_database_pool() -> Result { - Ok(Client::create_pool(Some(IamGeneratorBuilder::default().build().await?)).await?) +pub async fn create_database_pool(env_config: &EnvConfig) -> Result { + Ok(Client::create_pool( + Some(IamGeneratorBuilder::default().build(env_config).await?), + env_config, + ) + .await?) } /// Update connection options with new credentials. /// Todo, replace this with sqlx `before_connect` once it is implemented. -pub async fn update_credentials(pool: &PgPool) -> Result<(), Error> { - pool.set_connect_options( - Client::connect_options(Some(IamGeneratorBuilder::default().build().await?)).await?, - ); +pub async fn update_credentials( + connection: &DatabaseConnection, + env_config: &EnvConfig, +) -> Result<(), Error> { + connection + .get_postgres_connection_pool() + .set_connect_options( + Client::pg_connect_options( + Some(IamGeneratorBuilder::default().build(env_config).await?), + env_config, + ) + .await?, + ); Ok(()) } @@ -205,6 +222,7 @@ mod tests { }; use crate::events::aws::FlatS3EventMessage; use crate::events::EventSourceType::S3; + use sqlx::PgPool; use super::*; @@ -216,9 +234,15 @@ mod tests { set_sqs_client_expectations(&mut sqs_client); set_s3_client_expectations(&mut s3_client, vec![|| Ok(expected_head_object())]); - let ingester = receive_and_ingest(s3_client, sqs_client, Some("url"), Client::new(pool)) - .await - .unwrap(); + let ingester = receive_and_ingest( + s3_client, + sqs_client, + Some("url"), + Client::from_pool(pool), + &Default::default(), + ) + .await + .unwrap(); let (object_results, s3_object_results) = fetch_results(&ingester).await; @@ -256,9 +280,14 @@ mod tests { }], }; - let ingester = ingest_event(event, s3_client, Client::new(pool)) - .await - .unwrap(); + let ingester = ingest_event( + event, + s3_client, + Client::from_pool(pool), + &Default::default(), + ) + .await + .unwrap(); let (object_results, s3_object_results) = fetch_results(&ingester).await; @@ -294,10 +323,11 @@ mod tests { let ingester = ingest_s3_inventory( client, - Client::new(pool.clone()), + Client::from_pool(pool.clone()), Some(MANIFEST_BUCKET.to_string()), Some("manifest.json".to_string()), None, + &Default::default(), ) .await .unwrap(); @@ -365,10 +395,11 @@ mod tests { ingest_s3_inventory( client, - Client::new(pool.clone()), + Client::from_pool(pool.clone()), Some(MANIFEST_BUCKET.to_string()), Some("manifest.json".to_string()), None, + &Default::default(), ) .await .unwrap(); @@ -429,10 +460,11 @@ mod tests { ingest_s3_inventory( client, - Client::new(pool.clone()), + Client::from_pool(pool.clone()), Some(MANIFEST_BUCKET.to_string()), Some("manifest.json".to_string()), None, + &Default::default(), ) .await .unwrap(); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs index 03d6f34c2..d3b5aa2f7 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs @@ -4,9 +4,8 @@ pub mod clients; pub mod database; +pub mod env; pub mod error; pub mod events; pub mod handlers; pub mod uuid; -pub mod env; - From 0e4fc3343610dc45e7f8319d9d68d0853fb8cd4b Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Fri, 21 Jun 2024 16:21:21 +1000 Subject: [PATCH 03/12] refactor(filemanager): rename object table and remove reference from database client. --- .../stateless/stacks/filemanager/Cargo.lock | 75 +++++++++++++++++++ .../0003_rename_object_to_group.sql | 1 + ...t_objects.sql => insert_object_groups.sql} | 2 +- .../filemanager-build/src/entities.rs | 2 + .../filemanager-http-lambda/src/main.rs | 2 +- .../filemanager-ingest-lambda/src/main.rs | 2 +- .../filemanager-inventory-lambda/src/main.rs | 2 +- .../filemanager-migrate-lambda/src/main.rs | 2 +- .../stacks/filemanager/filemanager/Cargo.toml | 1 + .../filemanager/src/database/aws/ingester.rs | 22 +++--- .../src/database/aws/ingester_paired.rs | 18 +++-- .../filemanager/src/database/aws/migration.rs | 14 ++-- .../filemanager/src/database/aws/query.rs | 8 +- .../filemanager/src/database/mod.rs | 42 ++++------- .../filemanager/src/handlers/aws.rs | 18 ++--- 15 files changed, 142 insertions(+), 69 deletions(-) create mode 100644 lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql rename lib/workload/stateless/stacks/filemanager/database/queries/ingester/{insert_objects.sql => insert_object_groups.sql} (62%) diff --git a/lib/workload/stateless/stacks/filemanager/Cargo.lock b/lib/workload/stateless/stacks/filemanager/Cargo.lock index e114b5430..fac488dfe 100644 --- a/lib/workload/stateless/stacks/filemanager/Cargo.lock +++ b/lib/workload/stateless/stacks/filemanager/Cargo.lock @@ -1051,6 +1051,61 @@ dependencies = [ "serde_with", ] +[[package]] +name = "axum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -1836,6 +1891,7 @@ dependencies = [ "aws-smithy-mocks-experimental", "aws-smithy-runtime-api", "aws_lambda_events", + "axum", "bytes", "chrono", "csv", @@ -2423,6 +2479,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -2867,6 +2924,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -4845,6 +4908,18 @@ dependencies = [ "syn 2.0.65", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "tap" version = "1.0.1" diff --git a/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql b/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql new file mode 100644 index 000000000..05013761f --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql @@ -0,0 +1 @@ +alter table object rename to object_group; diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/insert_objects.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/insert_object_groups.sql similarity index 62% rename from lib/workload/stateless/stacks/filemanager/database/queries/ingester/insert_objects.sql rename to lib/workload/stateless/stacks/filemanager/database/queries/ingester/insert_object_groups.sql index 2049654ef..121028ce5 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/insert_objects.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/insert_object_groups.sql @@ -1,5 +1,5 @@ -- Bulk insert of objects -insert into object (object_id) +insert into object_group (object_id) values ( unnest($1::uuid[]) ); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs index 04235b68f..d64c4d8b8 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs @@ -19,6 +19,8 @@ pub async fn generate_entities() -> Result<()> { "sea-orm-cli", "generate", "entity", + "--with-serde", + "both", "-u", &config.database_url, "-o", diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs index 9a97abc6d..035ccc17b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs @@ -21,7 +21,7 @@ async fn main() -> Result<(), Error> { S3Client::with_defaults().await, SQSClient::with_defaults().await, None::, - DbClient::from_ref(options), + DbClient::new(options.clone()), config, ) .await?; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs index ada051b7b..a50033e02 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs @@ -19,7 +19,7 @@ async fn main() -> Result<(), Error> { ingest_event( event.payload, Client::with_defaults().await, - DbClient::from_ref(options), + DbClient::new(options.clone()), config, ) .await?; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs index 9c1b6cf42..719d6eff6 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs @@ -33,7 +33,7 @@ async fn main() -> Result<(), Error> { update_credentials(options, config).await?; let client = Client::with_defaults().await; - let database = DbClient::from_ref(options); + let database = DbClient::new(options.clone()); match event.payload { Request::BucketKey(bucket_key) => { diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs index a412827a3..cdf757396 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs @@ -49,7 +49,7 @@ async fn main() -> Result<(), Error> { _ => { // If there's nothing to migrate, then this will just return Ok. Ok::<_, Error>( - Migration::new(DbClient::from_ref(options)) + Migration::new(DbClient::new(options.clone())) .migrate() .await?, ) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml index 169c32435..9753b7328 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml +++ b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml @@ -24,6 +24,7 @@ tracing-subscriber = { version = "0.3", default-features = false, features = ["f sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls", "chrono", "uuid"] } sea-orm = { version = "0.12", features = ["sqlx-postgres", "runtime-tokio-rustls", "macros", "sea-orm-internal"] } +axum = "0.7" envy = "0.4" chrono = { version = "0.4", features = ["serde"] } thiserror = "1" diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs index 57a37d9c7..d4da2f212 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs @@ -14,8 +14,8 @@ use crate::events::aws::{StorageClass, TransposedS3EventMessages}; /// An ingester for S3 events. #[derive(Debug)] -pub struct Ingester<'a> { - pub(crate) client: Client<'a>, +pub struct Ingester { + pub(crate) client: Client, } /// The type representing an insert query. @@ -25,9 +25,9 @@ struct Insert { number_duplicate_events: i64, } -impl<'a> Ingester<'a> { +impl Ingester { /// Create a new ingester. - pub fn new(client: Client<'a>) -> Self { + pub fn new(client: Client) -> Self { Self { client } } @@ -104,7 +104,7 @@ impl<'a> Ingester<'a> { ); query_file!( - "../database/queries/ingester/insert_objects.sql", + "../database/queries/ingester/insert_object_groups.sql", &object_ids, ) .execute(&mut *tx) @@ -1306,7 +1306,9 @@ pub(crate) mod tests { row_asserts(s3_object_results); // Clean up for next permutation. - pool.execute("truncate s3_object, object").await.unwrap(); + pool.execute("truncate s3_object, object_group") + .await + .unwrap(); } println!( @@ -1376,9 +1378,9 @@ pub(crate) mod tests { events } - pub(crate) async fn fetch_results<'a>(client: &Client<'a>) -> (Vec, Vec) { + pub(crate) async fn fetch_results<'a>(client: &Client) -> (Vec, Vec) { ( - sqlx::query("select * from object") + sqlx::query("select * from object_group") .fetch_all(client.pool()) .await .unwrap(), @@ -1389,7 +1391,7 @@ pub(crate) mod tests { ) } - pub(crate) async fn fetch_results_ordered<'a>(client: &Client<'a>) -> Vec { + pub(crate) async fn fetch_results_ordered<'a>(client: &Client) -> Vec { sqlx::query("select * from s3_object order by sequencer, key, version_id") .fetch_all(client.pool()) .await @@ -1529,7 +1531,7 @@ pub(crate) mod tests { update_test_events(expected_events_simple_delete_marker()) } - pub(crate) fn test_ingester<'a>(pool: PgPool) -> Client<'a> { + pub(crate) fn test_ingester(pool: PgPool) -> Client { Client::from_pool(pool) } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs index f491e9919..81db6d78b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs @@ -19,8 +19,8 @@ use crate::uuid::UuidGenerator; /// An ingester for S3 events. #[derive(Debug)] -pub struct IngesterPaired<'a> { - client: Client<'a>, +pub struct IngesterPaired { + client: Client, } /// The type representing an insert query. @@ -30,9 +30,9 @@ struct Insert { number_duplicate_events: i64, } -impl<'a> IngesterPaired<'a> { +impl IngesterPaired { /// Create a new ingester. - pub fn new(client: Client<'a>) -> Self { + pub fn new(client: Client) -> Self { Self { client } } @@ -179,7 +179,7 @@ impl<'a> IngesterPaired<'a> { ); query_file!( - "../database/queries/ingester/insert_objects.sql", + "../database/queries/ingester/insert_object_groups.sql", &object_ids, ) .execute(&mut *tx) @@ -240,7 +240,7 @@ impl<'a> IngesterPaired<'a> { ); query_file!( - "../database/queries/ingester/insert_objects.sql", + "../database/queries/ingester/insert_object_groups.sql", &object_ids, ) .execute(&mut *tx) @@ -1568,7 +1568,9 @@ pub(crate) mod tests { row_asserts(s3_object_results); // Clean up for next permutation. - pool.execute("truncate s3_object, object").await.unwrap(); + pool.execute("truncate s3_object, object_group") + .await + .unwrap(); } println!( @@ -1809,7 +1811,7 @@ pub(crate) mod tests { events } - pub(crate) fn test_ingester<'a>(pool: PgPool) -> Client<'a> { + pub(crate) fn test_ingester(pool: PgPool) -> Client { Client::from_pool(pool) } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs index 4228c292c..6b0530f64 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs @@ -13,13 +13,13 @@ use crate::error::Result; /// A struct to perform database migrations. #[derive(Debug)] -pub struct Migration<'a> { - client: Client<'a>, +pub struct Migration { + client: Client, } -impl<'a> Migration<'a> { +impl Migration { /// Create a new migration. - pub fn new(client: Client<'a>) -> Self { + pub fn new(client: Client) -> Self { Self { client } } @@ -43,7 +43,7 @@ impl<'a> Migration<'a> { } #[async_trait] -impl<'a> Migrate for Migration<'a> { +impl Migrate for Migration { async fn migrate(&self) -> Result<()> { trace!("applying migrations"); Self::migrator() @@ -69,7 +69,7 @@ pub(crate) mod tests { let migrate = Migration::new(Client::from_pool(pool)); let not_exists = sqlx::query!( - "select exists (select from information_schema.tables where table_name = 'object')" + "select exists (select from information_schema.tables where table_name = 'object_group')" ) .fetch_one(migrate.client.pool()) .await @@ -80,7 +80,7 @@ pub(crate) mod tests { migrate.migrate().await.unwrap(); let exists = sqlx::query!( - "select exists (select from information_schema.tables where table_name = 'object')" + "select exists (select from information_schema.tables where table_name = 'object_group')" ) .fetch_one(migrate.client.pool()) .await diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs index be55e7f8c..568f89908 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs @@ -7,17 +7,17 @@ use crate::events::aws::{FlatS3EventMessage, FlatS3EventMessages, StorageClass}; /// Query the filemanager via REST interface. #[derive(Debug)] -pub struct Query<'a> { - client: Client<'a>, +pub struct Query { + client: Client, } pub struct QueryResults { _results: Vec, // FIXME: Adjust return type } -impl<'a> Query<'a> { +impl Query { /// Creates a new filemanager query client. - pub fn new(client: Client<'a>) -> Self { + pub fn new(client: Client) -> Self { Self { client } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs index c861bcb7b..dbe474198 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs @@ -1,8 +1,6 @@ //! This module handles connecting to the filemanager database for actions such as ingesting events. //! -use std::borrow::Cow; - use crate::database::aws::ingester::Ingester; use crate::database::aws::ingester_paired::IngesterPaired; use crate::env::Config; @@ -29,24 +27,15 @@ pub trait CredentialGenerator { /// A database client handles database interaction. #[derive(Debug, Clone)] -pub struct Client<'a> { +pub struct Client { // Use a Cow here to allow an owned pool or a shared reference to a pool. - connection: Cow<'a, DatabaseConnection>, + connection: DatabaseConnection, } -impl<'a> Client<'a> { +impl Client { /// Create a database from an existing pool. pub fn new(connection: DatabaseConnection) -> Self { - Self { - connection: Cow::Owned(connection), - } - } - - /// Create a database from a reference to an existing pool. - pub fn from_ref(connection: &'a DatabaseConnection) -> Self { - Self { - connection: Cow::Borrowed(connection), - } + Self { connection } } /// Create a database connection from an existing pool. @@ -118,23 +107,24 @@ impl<'a> Client<'a> { self.connection.get_postgres_connection_pool() } - /// Get the database connection. - pub fn connection(&self) -> &DatabaseConnection { - &self.connection + /// Get the database connection. Clones the underlying `DatabaseConnection` which is + /// intended to be cheaply cloneable because it represents an Arc to a shared connection pool. + pub fn connection(&self) -> DatabaseConnection { + self.connection.clone() } } #[async_trait] -impl<'a> Ingest<'a> for Client<'a> { - async fn ingest(&'a self, events: EventSourceType) -> Result<()> { +impl Ingest for Client { + async fn ingest(&self, events: EventSourceType) -> Result<()> { match events { EventSourceType::S3(events) => { - Ingester::new(Self::from_ref(self.connection())) + Ingester::new(Self::new(self.connection())) .ingest_events(events) .await } EventSourceType::S3Paired(events) => { - IngesterPaired::new(Self::from_ref(self.connection())) + IngesterPaired::new(Self::new(self.connection())) .ingest_events(events) .await } @@ -144,9 +134,9 @@ impl<'a> Ingest<'a> for Client<'a> { /// This trait ingests raw events into the database. #[async_trait] -pub trait Ingest<'a> { +pub trait Ingest { /// Ingest the events. - async fn ingest(&'a self, events: EventSourceType) -> Result<()>; + async fn ingest(&self, events: EventSourceType) -> Result<()>; } /// Trait representing database migrations. @@ -196,7 +186,7 @@ pub(crate) mod tests { .unwrap(); query_file!( - "../database/queries/ingester/insert_objects.sql", + "../database/queries/ingester/insert_object_groups.sql", &vec![object_id], ) .fetch_all(&mut *tx) @@ -258,7 +248,7 @@ pub(crate) mod tests { .unwrap(); query_file!( - "../database/queries/ingester/insert_objects.sql", + "../database/queries/ingester/insert_object_groups.sql", &vec![object_id], ) .fetch_all(&mut *tx) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs index 05ea1e430..f2e9cab9d 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs @@ -26,13 +26,13 @@ use crate::events::{Collect, EventSourceType}; /// Handle SQS events by manually calling the SQS receive function. This is meant /// to be run through something like API gateway to manually invoke ingestion. -pub async fn receive_and_ingest<'a>( +pub async fn receive_and_ingest( s3_client: S3Client, sqs_client: SQSClient, sqs_url: Option>, - database_client: Client<'a>, + database_client: Client, env_config: &EnvConfig, -) -> Result, Error> { +) -> Result { let events = CollecterBuilder::default() .with_s3_client(s3_client) .with_sqs_client(sqs_client) @@ -47,12 +47,12 @@ pub async fn receive_and_ingest<'a>( } /// Handle SQS events that go through an SqsEvent. -pub async fn ingest_event<'a>( +pub async fn ingest_event( event: SqsEvent, s3_client: S3Client, - database_client: Client<'a>, + database_client: Client, env_config: &EnvConfig, -) -> Result, Error> { +) -> Result { trace!("received event: {:?}", event); let events: FlatS3EventMessages = event @@ -83,14 +83,14 @@ pub async fn ingest_event<'a>( } /// Handle an S3 inventory for ingestion. -pub async fn ingest_s3_inventory<'a>( +pub async fn ingest_s3_inventory( s3_client: S3Client, - database_client: Client<'a>, + database_client: Client, bucket: Option, key: Option, manifest: Option, env_config: &EnvConfig, -) -> Result, Error> { +) -> Result { if env_config.paired_ingest_mode() { return Err(Error::from( "paired ingest mode is not supported for S3 inventory".to_string(), From 521bf62a1bdcef59abb6e3e8eb24f875e9161a8f Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 24 Jun 2024 10:03:08 +1000 Subject: [PATCH 04/12] feat(filemanager): add list operation for object groups --- .../filemanager/src/database/mod.rs | 5 ++ .../stacks/filemanager/filemanager/src/lib.rs | 2 + .../filemanager/src/queries/list.rs | 27 +++++++++++ .../filemanager/src/queries/mod.rs | 4 ++ .../filemanager/src/routes/list.rs | 21 +++++++++ .../filemanager/filemanager/src/routes/mod.rs | 47 +++++++++++++++++++ 6 files changed, 106 insertions(+) create mode 100644 lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs create mode 100644 lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs create mode 100644 lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs create mode 100644 lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs index dbe474198..9ee9e2b70 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs @@ -112,6 +112,11 @@ impl Client { pub fn connection(&self) -> DatabaseConnection { self.connection.clone() } + + /// Get the database connection as a reference. + pub fn connection_ref(&self) -> &DatabaseConnection { + &self.connection + } } #[async_trait] diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs index d3b5aa2f7..1eee5ba5e 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs @@ -8,4 +8,6 @@ pub mod env; pub mod error; pub mod events; pub mod handlers; +pub mod queries; +pub mod routes; pub mod uuid; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs new file mode 100644 index 000000000..f895034b7 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs @@ -0,0 +1,27 @@ +//! Query builder involving list operations on the database. +//! + +use crate::database::entities::object_group::Entity as ObjectGroupEntity; +use crate::database::entities::object_group::Model as ObjectGroup; +use crate::database::Client; +use crate::error::Result; +use sea_orm::EntityTrait; + +/// A query builder for list operations. +pub struct ListQueryBuilder<'a> { + client: &'a Client, +} + +impl<'a> ListQueryBuilder<'a> { + /// Create a new query builder. + pub fn new(client: &'a Client) -> Self { + Self { client } + } + + /// Find all object groups. + pub async fn list_object_groups(&self) -> Result> { + Ok(ObjectGroupEntity::find() + .all(self.client.connection_ref()) + .await?) + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs new file mode 100644 index 000000000..bea1cce61 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs @@ -0,0 +1,4 @@ +//! This module handles all logic related to querying the file manager through APIs/events. +//! + +pub mod list; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs new file mode 100644 index 000000000..c2e162478 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs @@ -0,0 +1,21 @@ +//! Route logic for list API calls. +//! + +use crate::database::entities::object_group::Model as ObjectGroup; +use crate::error::Result; +use crate::queries::list::ListQueryBuilder; +use crate::routes::AppState; +use axum::extract::State; +use axum::Json; +use serde::Deserialize; + +/// Params for a list object request. +#[derive(Debug, Deserialize)] +pub struct ListObjectGroupsParams {} + +/// The list object handler. +pub async fn list_object_groups(state: State) -> Result>> { + let query = ListQueryBuilder::new(&state.client); + + Ok(Json(query.list_object_groups().await?)) +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs new file mode 100644 index 000000000..f39aa6003 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs @@ -0,0 +1,47 @@ +//! This module handles API routing. +//! + +pub mod list; + +use crate::database::Client; +use crate::error::Error; +use crate::routes::list::list_object_groups; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::routing::get; +use axum::{Json, Router}; +use serde::Serialize; + +#[derive(Debug, Clone)] +pub struct AppState { + client: Client, +} + +/// The main filemanager router for query read-only requests. +pub fn query_router(database: Client) -> Router { + let state = AppState { client: database }; + + Router::new() + .route("/objects", get(list_object_groups)) + .with_state(state) +} + +/// The errro response format returned in the API. +#[derive(Serialize)] +pub struct ErrorResponse { + message: String, +} + +impl IntoResponse for Error { + fn into_response(self) -> Response { + let (status, message) = match self { + Error::SQLError(err) => (StatusCode::NOT_FOUND, err.to_string()), + _ => ( + StatusCode::INTERNAL_SERVER_ERROR, + "unexpected error".to_string(), + ), + }; + + (status, Json(ErrorResponse { message })).into_response() + } +} From 963eb526e6c3ce103f48dba2e5506ce4ff25531d Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 24 Jun 2024 13:20:45 +1000 Subject: [PATCH 05/12] feat(filemanager): add list operation for s3 objects --- .../filemanager/src/queries/list.rs | 23 +++++++++++++++++-- .../filemanager/src/routes/list.rs | 16 +++++++++++-- .../filemanager/filemanager/src/routes/mod.rs | 3 ++- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs index f895034b7..8e2ae55af 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs @@ -3,9 +3,11 @@ use crate::database::entities::object_group::Entity as ObjectGroupEntity; use crate::database::entities::object_group::Model as ObjectGroup; +use crate::database::entities::s3_object::Entity as S3ObjectEntity; +use crate::database::entities::s3_object::Model as S3Object; use crate::database::Client; use crate::error::Result; -use sea_orm::EntityTrait; +use sea_orm::{EntityTrait, Select}; /// A query builder for list operations. pub struct ListQueryBuilder<'a> { @@ -18,9 +20,26 @@ impl<'a> ListQueryBuilder<'a> { Self { client } } + /// Build a select query for finding values from object groups. + pub fn build_object_group() -> Select { + ObjectGroupEntity::find() + } + + /// Build a select query for finding values from s3 objects. + pub fn build_s3_object() -> Select { + S3ObjectEntity::find() + } + /// Find all object groups. pub async fn list_object_groups(&self) -> Result> { - Ok(ObjectGroupEntity::find() + Ok(Self::build_object_group() + .all(self.client.connection_ref()) + .await?) + } + + /// Find all s3 objects. + pub async fn list_s3_objects(&self) -> Result> { + Ok(Self::build_s3_object() .all(self.client.connection_ref()) .await?) } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs index c2e162478..03e34cae8 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs @@ -2,6 +2,7 @@ //! use crate::database::entities::object_group::Model as ObjectGroup; +use crate::database::entities::s3_object::Model as S3Object; use crate::error::Result; use crate::queries::list::ListQueryBuilder; use crate::routes::AppState; @@ -9,13 +10,24 @@ use axum::extract::State; use axum::Json; use serde::Deserialize; -/// Params for a list object request. +/// Params for a list object groups request. #[derive(Debug, Deserialize)] pub struct ListObjectGroupsParams {} -/// The list object handler. +/// The list object groups handler. pub async fn list_object_groups(state: State) -> Result>> { let query = ListQueryBuilder::new(&state.client); Ok(Json(query.list_object_groups().await?)) } + +/// Params for a list s3 objects request. +#[derive(Debug, Deserialize)] +pub struct ListS3ObjectsParams {} + +/// The list s3 objects handler. +pub async fn list_s3_objects(state: State) -> Result>> { + let query = ListQueryBuilder::new(&state.client); + + Ok(Json(query.list_s3_objects().await?)) +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs index f39aa6003..23f667737 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs @@ -5,7 +5,7 @@ pub mod list; use crate::database::Client; use crate::error::Error; -use crate::routes::list::list_object_groups; +use crate::routes::list::{list_object_groups, list_s3_objects}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::routing::get; @@ -23,6 +23,7 @@ pub fn query_router(database: Client) -> Router { Router::new() .route("/objects", get(list_object_groups)) + .route("/s3_objects", get(list_s3_objects)) .with_state(state) } From 0894bee653e3237b24db2177696025286ce23072 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 24 Jun 2024 13:47:51 +1000 Subject: [PATCH 06/12] feat(filemanager): add get by id requests --- .../filemanager/src/queries/get.rs | 47 +++++++++++++++++++ .../filemanager/src/queries/mod.rs | 1 + .../filemanager/filemanager/src/routes/get.rs | 40 ++++++++++++++++ .../filemanager/filemanager/src/routes/mod.rs | 6 ++- 4 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs create mode 100644 lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs new file mode 100644 index 000000000..1fe461d94 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs @@ -0,0 +1,47 @@ +//! Query builder involving get operations on the database. +//! + +use crate::database::entities::object_group::Entity as ObjectGroupEntity; +use crate::database::entities::object_group::Model as ObjectGroup; +use crate::database::entities::s3_object::Entity as S3ObjectEntity; +use crate::database::entities::s3_object::Model as S3Object; +use crate::database::Client; +use crate::error::Result; +use sea_orm::{EntityTrait, Select}; +use uuid::Uuid; + +/// A query builder for get operations. +pub struct GetQueryBuilder<'a> { + client: &'a Client, +} + +impl<'a> GetQueryBuilder<'a> { + /// Create a new query builder. + pub fn new(client: &'a Client) -> Self { + Self { client } + } + + /// Build a select query for finding an object group by id. + pub fn build_object_group_by_id(id: Uuid) -> Select { + ObjectGroupEntity::find_by_id(id) + } + + /// Build a select query for finding an s3 object by id. + pub fn build_s3_object_by_id(id: Uuid) -> Select { + S3ObjectEntity::find_by_id(id) + } + + /// Get a specific object group by id. + pub async fn get_object_group(&self, id: Uuid) -> Result> { + Ok(Self::build_object_group_by_id(id) + .one(self.client.connection_ref()) + .await?) + } + + /// Get a specific s3 object by id. + pub async fn get_s3_object_by_id(&self, id: Uuid) -> Result> { + Ok(Self::build_s3_object_by_id(id) + .one(self.client.connection_ref()) + .await?) + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs index bea1cce61..0d3ce1211 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs @@ -1,4 +1,5 @@ //! This module handles all logic related to querying the file manager through APIs/events. //! +pub mod get; pub mod list; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs new file mode 100644 index 000000000..546dd6b88 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs @@ -0,0 +1,40 @@ +//! Route logic for get API calls. +//! + +use crate::database::entities::object_group::Model as ObjectGroup; +use crate::database::entities::s3_object::Model as S3Object; +use crate::error::Result; +use crate::queries::get::GetQueryBuilder; +use crate::routes::AppState; +use axum::extract::{Path, State}; +use axum::Json; +use serde::Deserialize; +use uuid::Uuid; + +/// Params for a get object group by id request. +#[derive(Debug, Deserialize)] +pub struct GetObjectGroupById {} + +/// The get object groups handler. +pub async fn get_object_group_by_id( + state: State, + Path(id): Path, +) -> Result>> { + let query = GetQueryBuilder::new(&state.client); + + Ok(Json(query.get_object_group(id).await?)) +} + +/// Params for a get s3 objects by id request. +#[derive(Debug, Deserialize)] +pub struct GetS3ObjectById {} + +/// The get s3 objects handler. +pub async fn get_s3_object_by_id( + state: State, + Path(id): Path, +) -> Result>> { + let query = GetQueryBuilder::new(&state.client); + + Ok(Json(query.get_s3_object_by_id(id).await?)) +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs index 23f667737..02840765b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs @@ -1,10 +1,12 @@ //! This module handles API routing. //! +pub mod get; pub mod list; use crate::database::Client; use crate::error::Error; +use crate::routes::get::{get_object_group_by_id, get_s3_object_by_id}; use crate::routes::list::{list_object_groups, list_s3_objects}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; @@ -22,8 +24,10 @@ pub fn query_router(database: Client) -> Router { let state = AppState { client: database }; Router::new() - .route("/objects", get(list_object_groups)) + .route("/object_groups", get(list_object_groups)) + .route("/object_groups/:id", get(get_object_group_by_id)) .route("/s3_objects", get(list_s3_objects)) + .route("/s3_objects/:id", get(get_s3_object_by_id)) .with_state(state) } From cca53cec6c8d5b27efb35a2c9bde694551566589 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 24 Jun 2024 14:33:19 +1000 Subject: [PATCH 07/12] feat(filemanager): add count operation to list calls --- .../filemanager/filemanager/src/queries/list.rs | 16 +++++++++++++++- .../filemanager/filemanager/src/routes/list.rs | 14 ++++++++++++++ .../filemanager/filemanager/src/routes/mod.rs | 6 +++++- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs index 8e2ae55af..c0567fea2 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs @@ -7,7 +7,7 @@ use crate::database::entities::s3_object::Entity as S3ObjectEntity; use crate::database::entities::s3_object::Model as S3Object; use crate::database::Client; use crate::error::Result; -use sea_orm::{EntityTrait, Select}; +use sea_orm::{EntityTrait, PaginatorTrait, Select}; /// A query builder for list operations. pub struct ListQueryBuilder<'a> { @@ -43,4 +43,18 @@ impl<'a> ListQueryBuilder<'a> { .all(self.client.connection_ref()) .await?) } + + /// Count object groups. + pub async fn count_object_groups(&self) -> Result { + Ok(Self::build_object_group() + .count(self.client.connection_ref()) + .await?) + } + + /// Count s3 objects. + pub async fn count_s3_objects(&self) -> Result { + Ok(Self::build_s3_object() + .count(self.client.connection_ref()) + .await?) + } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs index 03e34cae8..88b67b652 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs @@ -21,6 +21,13 @@ pub async fn list_object_groups(state: State) -> Result) -> Result> { + let query = ListQueryBuilder::new(&state.client); + + Ok(Json(query.count_object_groups().await?)) +} + /// Params for a list s3 objects request. #[derive(Debug, Deserialize)] pub struct ListS3ObjectsParams {} @@ -31,3 +38,10 @@ pub async fn list_s3_objects(state: State) -> Result) -> Result> { + let query = ListQueryBuilder::new(&state.client); + + Ok(Json(query.count_s3_objects().await?)) +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs index 02840765b..e2b53cfca 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs @@ -7,7 +7,9 @@ pub mod list; use crate::database::Client; use crate::error::Error; use crate::routes::get::{get_object_group_by_id, get_s3_object_by_id}; -use crate::routes::list::{list_object_groups, list_s3_objects}; +use crate::routes::list::{ + count_object_groups, count_s3_objects, list_object_groups, list_s3_objects, +}; use axum::http::StatusCode; use axum::response::{IntoResponse, Response}; use axum::routing::get; @@ -26,8 +28,10 @@ pub fn query_router(database: Client) -> Router { Router::new() .route("/object_groups", get(list_object_groups)) .route("/object_groups/:id", get(get_object_group_by_id)) + .route("/object_groups/count", get(count_object_groups)) .route("/s3_objects", get(list_s3_objects)) .route("/s3_objects/:id", get(get_s3_object_by_id)) + .route("/s3_objects/count", get(count_s3_objects)) .with_state(state) } From ce774f6f4883f6fba09a7899a7c6c44d858ac084 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 24 Jun 2024 18:08:15 +1000 Subject: [PATCH 08/12] test(filemanager): test query functions and apis --- .../stateless/stacks/filemanager/Cargo.lock | 5 + .../filemanager-build/src/entities.rs | 18 ++- .../filemanager-build/src/error.rs | 8 +- .../filemanager/filemanager-build/src/lib.rs | 13 +- .../stacks/filemanager/filemanager/Cargo.toml | 16 +- .../filemanager/src/queries/get.rs | 43 +++++- .../filemanager/src/queries/list.rs | 70 ++++++++- .../filemanager/src/queries/mod.rs | 96 ++++++++++++ .../filemanager/filemanager/src/routes/get.rs | 83 +++++++++- .../filemanager/src/routes/list.rs | 145 +++++++++++++++++- .../filemanager/filemanager/src/routes/mod.rs | 51 +++++- 11 files changed, 515 insertions(+), 33 deletions(-) diff --git a/lib/workload/stateless/stacks/filemanager/Cargo.lock b/lib/workload/stateless/stacks/filemanager/Cargo.lock index fac488dfe..aa158e871 100644 --- a/lib/workload/stateless/stacks/filemanager/Cargo.lock +++ b/lib/workload/stateless/stacks/filemanager/Cargo.lock @@ -1916,8 +1916,10 @@ dependencies = [ "serde_json", "serde_with", "sqlx", + "strum 0.26.2", "thiserror", "tokio", + "tower", "tracing", "tracing-subscriber", "url", @@ -4833,6 +4835,9 @@ name = "strum" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" +dependencies = [ + "strum_macros", +] [[package]] name = "strum_macros" diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs index d64c4d8b8..c24fa3576 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/entities.rs @@ -2,14 +2,16 @@ //! database schema. //! -use crate::error::ErrorKind::EntityGeneration; -use crate::error::{Error, Result}; -use crate::Config; +use std::ffi::OsStr; +use std::fs::write; + use clap_builder::Parser; use quote::quote; use sea_orm_cli::{run_generate_command, Cli, Commands}; -use std::ffi::OsStr; -use std::fs::write; + +use crate::error::ErrorKind::EntityGeneration; +use crate::error::{Error, Result}; +use crate::Config; pub async fn generate_entities() -> Result<()> { let config = Config::load()?; @@ -21,6 +23,12 @@ pub async fn generate_entities() -> Result<()> { "entity", "--with-serde", "both", + "--enum-extra-derives", + "strum::FromRepr", + "--enum-extra-derives", + "strum::EnumCount", + "--enum-extra-attributes", + "repr(u8)", "-u", &config.database_url, "-o", diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/error.rs b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/error.rs index 5006c469f..9207e77b6 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/error.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/error.rs @@ -1,15 +1,17 @@ //! This module contains the crate's error types. //! -use crate::error::ErrorKind::{IoError, LoadingEnvironment}; -use crate::workspace_path; -use miette::{diagnostic, Diagnostic, NamedSource, SourceOffset}; use std::fmt::{Display, Formatter}; use std::fs::read_to_string; use std::panic::Location; use std::{io, result}; + +use miette::{diagnostic, Diagnostic, NamedSource, SourceOffset}; use thiserror::Error; +use crate::error::ErrorKind::{IoError, LoadingEnvironment}; +use crate::workspace_path; + pub type Result = result::Result; /// Error types for the filemanager. diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs index e56165a33..916ab47e0 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs @@ -1,11 +1,14 @@ -use crate::error::Error; -use crate::error::ErrorKind::LoadingEnvironment; -use envy::from_env; -use error::Result; -use serde::Deserialize; use std::env::var; use std::path::{Path, PathBuf}; +use envy::from_env; +use serde::Deserialize; + +use error::Result; + +use crate::error::Error; +use crate::error::ErrorKind::LoadingEnvironment; + pub mod entities; pub mod error; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml index 9753b7328..5733849c2 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml +++ b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml @@ -12,20 +12,28 @@ rust-version.workspace = true migrate = ["sqlx/migrate"] [dependencies] +# Serde serde = { version = "1", features = ["derive"] } serde_json = "1" serde_with = "3" +# Async async-trait = "0.1" futures = "0.3" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } tracing = "0.1" tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } +# Database sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls", "chrono", "uuid"] } sea-orm = { version = "0.12", features = ["sqlx-postgres", "runtime-tokio-rustls", "macros", "sea-orm-internal"] } +strum = { version = "0.26", features = ["derive"] } + +# Query server axum = "0.7" -envy = "0.4" +tower = "0.4" + +# General chrono = { version = "0.4", features = ["serde"] } thiserror = "1" uuid = { version = "1", features = ["v7"] } @@ -33,6 +41,10 @@ mockall = "0.12" mockall_double = "0.3" itertools = "0.12" url = "2" +bytes = "1.6" +envy = "0.4" + +# Inventory csv = "1" flate2 = "1" md5 = "0.7" @@ -41,8 +53,8 @@ parquet = { version = "50", features = ["async"] } arrow = { version = "50", features = ["chrono-tz"] } arrow-json = "50" orc-rust = "0.3" -bytes = "1.6" +# AWS aws-sdk-sqs = "1" aws-config = "1" aws-sdk-s3 = "1" diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs index 1fe461d94..e4bd8fec0 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs @@ -1,14 +1,15 @@ //! Query builder involving get operations on the database. //! +use sea_orm::{EntityTrait, Select}; +use uuid::Uuid; + use crate::database::entities::object_group::Entity as ObjectGroupEntity; use crate::database::entities::object_group::Model as ObjectGroup; use crate::database::entities::s3_object::Entity as S3ObjectEntity; use crate::database::entities::s3_object::Model as S3Object; use crate::database::Client; use crate::error::Result; -use sea_orm::{EntityTrait, Select}; -use uuid::Uuid; /// A query builder for get operations. pub struct GetQueryBuilder<'a> { @@ -45,3 +46,41 @@ impl<'a> GetQueryBuilder<'a> { .await?) } } + +#[cfg(test)] +mod tests { + use sqlx::PgPool; + + use crate::database::aws::migration::tests::MIGRATOR; + use crate::database::Client; + use crate::queries::tests::initialize_database; + + use super::*; + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_get_object_group(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let first = entries.first().unwrap(); + let builder = GetQueryBuilder::new(&client); + let result = builder.get_object_group(first.0.object_id).await.unwrap(); + + assert_eq!(result.as_ref(), Some(&first.0)); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_list_s3_objects(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let first = entries.first().unwrap(); + let builder = GetQueryBuilder::new(&client); + let result = builder + .get_s3_object_by_id(first.1.s3_object_id) + .await + .unwrap(); + + assert_eq!(result.as_ref(), Some(&first.1)); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs index c0567fea2..fa139ab52 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs @@ -1,13 +1,14 @@ //! Query builder involving list operations on the database. //! +use sea_orm::{EntityTrait, PaginatorTrait, Select}; + use crate::database::entities::object_group::Entity as ObjectGroupEntity; use crate::database::entities::object_group::Model as ObjectGroup; use crate::database::entities::s3_object::Entity as S3ObjectEntity; use crate::database::entities::s3_object::Model as S3Object; use crate::database::Client; use crate::error::Result; -use sea_orm::{EntityTrait, PaginatorTrait, Select}; /// A query builder for list operations. pub struct ListQueryBuilder<'a> { @@ -58,3 +59,70 @@ impl<'a> ListQueryBuilder<'a> { .await?) } } + +#[cfg(test)] +mod tests { + use sqlx::PgPool; + + use crate::database::aws::migration::tests::MIGRATOR; + use crate::database::Client; + use crate::queries::tests::initialize_database; + + use super::*; + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_list_object_groups(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let builder = ListQueryBuilder::new(&client); + let result = builder.list_object_groups().await.unwrap(); + + assert_eq!( + result, + entries + .into_iter() + .map(|(entry, _)| entry) + .collect::>() + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_list_s3_objects(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let builder = ListQueryBuilder::new(&client); + let result = builder.list_s3_objects().await.unwrap(); + + assert_eq!( + result, + entries + .into_iter() + .map(|(_, entry)| entry) + .collect::>() + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_count_object_groups(pool: PgPool) { + let client = Client::from_pool(pool); + initialize_database(&client, 10).await; + + let builder = ListQueryBuilder::new(&client); + let result = builder.count_object_groups().await.unwrap(); + + assert_eq!(result, 10); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_count_s3_objects(pool: PgPool) { + let client = Client::from_pool(pool); + initialize_database(&client, 10).await; + + let builder = ListQueryBuilder::new(&client); + let result = builder.count_s3_objects().await.unwrap(); + + assert_eq!(result, 10); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs index 0d3ce1211..1e33bbf40 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs @@ -3,3 +3,99 @@ pub mod get; pub mod list; + +#[cfg(test)] +pub(crate) mod tests { + use std::ops::Add; + + use chrono::{DateTime, Days}; + use sea_orm::Set; + use sea_orm::{ActiveModelTrait, TryIntoModel}; + use strum::EnumCount; + + use crate::database::entities::object_group::ActiveModel as ActiveObjectGroup; + use crate::database::entities::object_group::Model as ObjectGroup; + use crate::database::entities::s3_object::ActiveModel as ActiveS3Object; + use crate::database::entities::s3_object::Model as S3Object; + use crate::database::entities::sea_orm_active_enums::{EventType, StorageClass}; + use crate::database::Client; + use crate::uuid::UuidGenerator; + + /// Initialize database state for testing. + pub(crate) async fn initialize_database( + client: &Client, + n: usize, + ) -> Vec<(ObjectGroup, S3Object)> { + let mut output = vec![]; + + for index in 0..n { + let (group, object) = generate_entry(index); + + group.clone().insert(client.connection_ref()).await.unwrap(); + object + .clone() + .insert(client.connection_ref()) + .await + .unwrap(); + + output.push(( + group.try_into_model().unwrap(), + object.try_into_model().unwrap(), + )); + } + + output + } + + pub(crate) fn generate_entry(index: usize) -> (ActiveObjectGroup, ActiveS3Object) { + let object_id = UuidGenerator::generate(); + let event = event_type(index); + let date = || Set(Some(DateTime::default().add(Days::new(index as u64)))); + + ( + ActiveObjectGroup { + object_id: Set(object_id), + attributes: Set(None), + }, + ActiveS3Object { + s3_object_id: Set(UuidGenerator::generate()), + object_id: Set(object_id), + public_id: Set(UuidGenerator::generate()), + event_type: Set(event.clone()), + // Half as many buckets as keys. + bucket: Set((index / 2).to_string()), + key: Set(index.to_string()), + version_id: Set(index.to_string()), + date: date(), + size: Set(Some(index as i64)), + sha256: Set(Some(index.to_string())), + last_modified_date: date(), + e_tag: Set(Some(index.to_string())), + storage_class: Set(Some(storage_class(index))), + sequencer: Set(Some(index.to_string())), + is_delete_marker: Set(false), + number_duplicate_events: Set(0), + attributes: Set(None), + deleted_date: if event == EventType::Deleted { + date() + } else { + Set(None) + }, + deleted_sequencer: if event == EventType::Deleted { + Set(Some(index.to_string())) + } else { + Set(None) + }, + number_reordered: Set(0), + }, + ) + } + + pub(crate) fn event_type(index: usize) -> EventType { + EventType::from_repr((index % (EventType::COUNT - 1)) as u8).unwrap() + } + + pub(crate) fn storage_class(index: usize) -> StorageClass { + StorageClass::from_repr((index % StorageClass::COUNT) as u8).unwrap() + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs index 546dd6b88..569e93721 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs @@ -1,15 +1,16 @@ //! Route logic for get API calls. //! +use axum::extract::{Path, State}; +use axum::Json; +use serde::Deserialize; +use uuid::Uuid; + use crate::database::entities::object_group::Model as ObjectGroup; use crate::database::entities::s3_object::Model as S3Object; use crate::error::Result; use crate::queries::get::GetQueryBuilder; use crate::routes::AppState; -use axum::extract::{Path, State}; -use axum::Json; -use serde::Deserialize; -use uuid::Uuid; /// Params for a get object group by id request. #[derive(Debug, Deserialize)] @@ -38,3 +39,77 @@ pub async fn get_s3_object_by_id( Ok(Json(query.get_s3_object_by_id(id).await?)) } + +#[cfg(test)] +mod tests { + use axum::body::to_bytes; + use axum::body::Body; + use axum::http::Request; + use parquet::data_type::AsBytes; + use serde_json::from_slice; + use sqlx::PgPool; + use tower::ServiceExt; + + use crate::database::aws::migration::tests::MIGRATOR; + use crate::database::entities::object_group::Model as ObjectGroup; + use crate::database::entities::s3_object::Model as S3Object; + use crate::database::Client; + use crate::queries::tests::initialize_database; + use crate::routes::query_router; + + #[sqlx::test(migrator = "MIGRATOR")] + async fn get_object_groups_api(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let first = entries.first().unwrap(); + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri(format!("/object_groups/{}", first.0.object_id)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!(result, first.0); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn get_s3_objects_api(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let first = entries.first().unwrap(); + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri(format!("/s3_objects/{}", first.1.s3_object_id)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!(result, first.1); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs index 88b67b652..34053766a 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs @@ -1,14 +1,15 @@ //! Route logic for list API calls. //! +use axum::extract::State; +use axum::Json; +use serde::Deserialize; + use crate::database::entities::object_group::Model as ObjectGroup; use crate::database::entities::s3_object::Model as S3Object; use crate::error::Result; use crate::queries::list::ListQueryBuilder; use crate::routes::AppState; -use axum::extract::State; -use axum::Json; -use serde::Deserialize; /// Params for a list object groups request. #[derive(Debug, Deserialize)] @@ -45,3 +46,141 @@ pub async fn count_s3_objects(state: State) -> Result> { Ok(Json(query.count_s3_objects().await?)) } + +#[cfg(test)] +mod tests { + use axum::body::to_bytes; + use axum::body::Body; + use axum::http::Request; + use parquet::data_type::AsBytes; + use serde_json::from_slice; + use sqlx::PgPool; + use tower::ServiceExt; + + use crate::database::aws::migration::tests::MIGRATOR; + use crate::database::entities::object_group::Model as ObjectGroup; + use crate::database::entities::s3_object::Model as S3Object; + use crate::database::Client; + use crate::queries::tests::initialize_database; + use crate::routes::query_router; + + #[sqlx::test(migrator = "MIGRATOR")] + async fn list_object_groups_api(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri("/object_groups") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::>( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!( + result, + entries + .into_iter() + .map(|(entry, _)| entry) + .collect::>() + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn list_s3_objects_api(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri("/s3_objects") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::>( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!( + result, + entries + .into_iter() + .map(|(_, entry)| entry) + .collect::>() + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn count_object_groups_api(pool: PgPool) { + let client = Client::from_pool(pool); + initialize_database(&client, 10).await; + + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri("/object_groups/count") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!(result, 10); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn count_s3_objects_api(pool: PgPool) { + let client = Client::from_pool(pool); + initialize_database(&client, 10).await; + + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri("/s3_objects/count") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!(result, 10); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs index e2b53cfca..d75f241f6 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs @@ -1,8 +1,11 @@ //! This module handles API routing. //! -pub mod get; -pub mod list; +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::routing::get; +use axum::{Json, Router}; +use serde::Serialize; use crate::database::Client; use crate::error::Error; @@ -10,11 +13,9 @@ use crate::routes::get::{get_object_group_by_id, get_s3_object_by_id}; use crate::routes::list::{ count_object_groups, count_s3_objects, list_object_groups, list_s3_objects, }; -use axum::http::StatusCode; -use axum::response::{IntoResponse, Response}; -use axum::routing::get; -use axum::{Json, Router}; -use serde::Serialize; + +pub mod get; +pub mod list; #[derive(Debug, Clone)] pub struct AppState { @@ -35,7 +36,7 @@ pub fn query_router(database: Client) -> Router { .with_state(state) } -/// The errro response format returned in the API. +/// The error response format returned in the API. #[derive(Serialize)] pub struct ErrorResponse { message: String, @@ -54,3 +55,37 @@ impl IntoResponse for Error { (status, Json(ErrorResponse { message })).into_response() } } + +#[cfg(test)] +mod tests { + use axum::body::to_bytes; + use axum::http::StatusCode; + use axum::response::IntoResponse; + use parquet::data_type::AsBytes; + use serde_json::{from_slice, json, Value}; + + use crate::error::Error; + + #[tokio::test] + async fn sql_error_into_response() { + let response = Error::SQLError("error".to_string()).into_response(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + assert_eq!( + json!({"message": "error"}), + from_slice::( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes() + ) + .unwrap() + ); + } + + #[tokio::test] + async fn internal_error_into_response() { + let response = Error::MigrateError("error".to_string()).into_response(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); + } +} From 7a0921c465951d5325ec381246fea79e987d4e7b Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 24 Jun 2024 18:20:01 +1000 Subject: [PATCH 09/12] refactor(filemanager): rename object_id to object_group_id --- .../0003_rename_object_to_group.sql | 2 ++ .../api/select_existing_by_bucket_key.sql | 2 +- .../aws/insert_s3_created_objects.sql | 4 +-- .../aws/insert_s3_deleted_objects.sql | 4 +-- .../ingester/aws/insert_s3_objects.sql | 4 +-- .../aws/update_reordered_for_created.sql | 2 +- .../aws/update_reordered_for_deleted.sql | 2 +- .../queries/ingester/insert_object_groups.sql | 2 +- .../filemanager/src/database/aws/ingester.rs | 10 +++---- .../src/database/aws/ingester_paired.rs | 6 ++--- .../filemanager/src/database/mod.rs | 4 +-- .../filemanager/src/events/aws/collecter.rs | 2 +- .../filemanager/src/events/aws/inventory.rs | 2 +- .../filemanager/src/events/aws/message.rs | 2 +- .../filemanager/src/events/aws/mod.rs | 26 +++++++++---------- .../filemanager/src/queries/get.rs | 5 +++- .../filemanager/src/queries/mod.rs | 6 ++--- .../filemanager/filemanager/src/routes/get.rs | 2 +- 18 files changed, 46 insertions(+), 41 deletions(-) diff --git a/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql b/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql index 05013761f..b2ff29a7d 100644 --- a/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql +++ b/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql @@ -1 +1,3 @@ alter table object rename to object_group; +alter table object_group rename object_id to object_group_id; +alter table s3_object rename object_id to object_group_id; \ No newline at end of file diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql b/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql index 8040dbdbc..27aad178e 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/api/select_existing_by_bucket_key.sql @@ -20,7 +20,7 @@ with input as ( ) -- Select objects into a FlatS3EventMessage struct. select - object_id, + object_group_id, s3_object_id, public_id, s3_object.bucket, diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql index 3f8e27934..a01781614 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_created_objects.sql @@ -1,7 +1,7 @@ -- Bulk insert of s3 objects. insert into s3_object ( s3_object_id, - object_id, + object_group_id, public_id, bucket, key, @@ -34,4 +34,4 @@ values ( unnest($15::event_type[]) ) on conflict on constraint sequencer_unique do update set number_duplicate_events = s3_object.number_duplicate_events + 1 - returning object_id, number_duplicate_events; + returning object_group_id, number_duplicate_events; diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql index fcbf50719..c9d1a130b 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_deleted_objects.sql @@ -1,7 +1,7 @@ -- Bulk insert of s3 objects. insert into s3_object ( s3_object_id, - object_id, + object_group_id, public_id, bucket, key, @@ -36,4 +36,4 @@ values ( unnest($16::event_type[]) ) on conflict on constraint deleted_sequencer_unique do update set number_duplicate_events = s3_object.number_duplicate_events + 1 - returning object_id, number_duplicate_events; + returning object_group_id, number_duplicate_events; diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql index f020ef202..cdc1db487 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/insert_s3_objects.sql @@ -1,6 +1,6 @@ -- Bulk insert of s3 objects. insert into s3_object ( - object_id, + object_group_id, s3_object_id, public_id, bucket, @@ -34,4 +34,4 @@ values ( unnest($15::event_type[]) ) on conflict on constraint sequencer_unique do update set number_duplicate_events = s3_object.number_duplicate_events + 1 -returning object_id, number_duplicate_events; +returning object_group_id, number_duplicate_events; diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql index 5140bee47..c2b28c4fe 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_created.sql @@ -114,7 +114,7 @@ update as ( select -- Note, this is the passed through value from the input in order to identify this event later. input_id as "s3_object_id!", - object_id, + object_group_id, public_id, bucket, key, diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql index 4c0484353..42305580d 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/aws/update_reordered_for_deleted.sql @@ -85,7 +85,7 @@ update as ( select -- Note, this is the passed through value from the input in order to identify this event later. input_id as "s3_object_id!", - object_id, + object_group_id, public_id, bucket, key, diff --git a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/insert_object_groups.sql b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/insert_object_groups.sql index 121028ce5..d247c1bcf 100644 --- a/lib/workload/stateless/stacks/filemanager/database/queries/ingester/insert_object_groups.sql +++ b/lib/workload/stateless/stacks/filemanager/database/queries/ingester/insert_object_groups.sql @@ -1,5 +1,5 @@ -- Bulk insert of objects -insert into object_group (object_id) +insert into object_group (object_group_id) values ( unnest($1::uuid[]) ); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs index d4da2f212..866606e5b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs @@ -21,7 +21,7 @@ pub struct Ingester { /// The type representing an insert query. #[derive(Debug)] struct Insert { - object_id: Uuid, + object_group_id: Uuid, number_duplicate_events: i64, } @@ -48,13 +48,13 @@ impl Ingester { // If we cannot find the object in our new ids, this object already exists. let pos = inserted.iter().rposition(|record| { // This will never be `None`, maybe this is an sqlx bug? - record.object_id == object_id + record.object_group_id == object_id })?; // We can remove this to avoid searching over it again. let record = inserted.remove(pos); debug!( - object_id = ?record.object_id, + object_id = ?record.object_group_id, number_duplicate_events = record.number_duplicate_events, "duplicate event found" ); @@ -76,7 +76,7 @@ impl Ingester { let mut inserted = query_file_as!( Insert, "../database/queries/ingester/aws/insert_s3_objects.sql", - &events.object_ids, + &events.object_group_ids, &events.s3_object_ids, &events.public_ids, &events.buckets, @@ -95,7 +95,7 @@ impl Ingester { .fetch_all(&mut *tx) .await?; - let object_ids = Self::reprocess_inserts(events.object_ids, &mut inserted); + let object_ids = Self::reprocess_inserts(events.object_group_ids, &mut inserted); // Insert only the non duplicate events. if !object_ids.is_empty() { debug!( diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs index 81db6d78b..a8209ea38 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs @@ -26,7 +26,7 @@ pub struct IngesterPaired { /// The type representing an insert query. #[derive(Debug)] struct Insert { - object_id: Uuid, + object_group_id: Uuid, number_duplicate_events: i64, } @@ -94,13 +94,13 @@ impl IngesterPaired { // If we cannot find the object in our new ids, this object already exists. let pos = inserted.iter().rposition(|record| { // This will never be `None`, maybe this is an sqlx bug? - record.object_id == object_id + record.object_group_id == object_id })?; // We can remove this to avoid searching over it again. let record = inserted.remove(pos); debug!( - object_id = ?record.object_id, + object_id = ?record.object_group_id, number_duplicate_events = record.number_duplicate_events, "duplicate event found" ); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs index 9ee9e2b70..2d2d9404d 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs @@ -202,7 +202,7 @@ pub(crate) mod tests { let inserted = query!( "select s3_object_id as \"s3_object_id!\", - object_id as \"object_id!\", + object_group_id as \"object_group_id!\", bucket, key, date, @@ -264,7 +264,7 @@ pub(crate) mod tests { let inserted = query!( "select s3_object_id as \"s3_object_id!\", - object_id as \"object_id!\", + object_group_id as \"object_group_id!\", bucket, key, date, diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs index bf76d76da..e098b612b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs @@ -267,7 +267,7 @@ pub(crate) mod tests { .for_each(|(expected_event, event)| { // The object id will be different for each event. expected_event.s3_object_id = event.s3_object_id; - expected_event.object_id = event.object_id; + expected_event.object_group_id = event.object_group_id; expected_event.public_id = event.public_id; }); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs index d892044b4..bed7e2c1d 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/inventory.rs @@ -517,7 +517,7 @@ impl From for FlatS3EventMessage { } = record; Self { - object_id: UuidGenerator::generate(), + object_group_id: UuidGenerator::generate(), s3_object_id: UuidGenerator::generate(), public_id: UuidGenerator::generate(), // We don't know when this object was created so there is no event time. diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs index 54bb7e721..a65f4d26a 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/message.rs @@ -184,7 +184,7 @@ impl From for FlatS3EventMessage { } = EventTypeDeleteMarker::from(EventTypeData::new(detail_type, deletion_type)); Self { - object_id: UuidGenerator::generate(), + object_group_id: UuidGenerator::generate(), s3_object_id: UuidGenerator::generate(), public_id: UuidGenerator::generate(), event_time: Some(time), diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs index 612f83aba..4c48e8d57 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs @@ -65,7 +65,7 @@ impl StorageClass { /// the database structs do not have to perform this conversion. #[derive(Debug, Eq, PartialEq, Default, Clone)] pub struct TransposedS3EventMessages { - pub object_ids: Vec, + pub object_group_ids: Vec, pub s3_object_ids: Vec, pub public_ids: Vec, pub event_times: Vec>>, @@ -87,7 +87,7 @@ impl TransposedS3EventMessages { /// TODO: There was a S3 messaging spec about how long those fields are supposed to be? pub fn with_capacity(capacity: usize) -> Self { Self { - object_ids: Vec::with_capacity(capacity), + object_group_ids: Vec::with_capacity(capacity), s3_object_ids: Vec::with_capacity(capacity), public_ids: Vec::with_capacity(capacity), event_times: Vec::with_capacity(capacity), @@ -108,7 +108,7 @@ impl TransposedS3EventMessages { /// Push an S3 event message. pub fn push(&mut self, message: FlatS3EventMessage) { let FlatS3EventMessage { - object_id, + object_group_id, s3_object_id, public_id, event_time, @@ -126,7 +126,7 @@ impl TransposedS3EventMessages { .. } = message; - self.object_ids.push(object_id); + self.object_group_ids.push(object_group_id); self.s3_object_ids.push(s3_object_id); self.public_ids.push(public_id); self.event_times.push(event_time); @@ -165,7 +165,7 @@ impl From for TransposedS3EventMessages { impl From for FlatS3EventMessages { fn from(messages: TransposedS3EventMessages) -> Self { let zip = izip!( - messages.object_ids, + messages.object_group_ids, messages.s3_object_ids, messages.public_ids, messages.event_times, @@ -183,7 +183,7 @@ impl From for FlatS3EventMessages { ) .map( |( - object_id, + object_group_id, s3_object_id, public_id, event_time, @@ -200,7 +200,7 @@ impl From for FlatS3EventMessages { is_delete_marker, )| { FlatS3EventMessage { - object_id, + object_group_id, s3_object_id, public_id, sequencer, @@ -435,7 +435,7 @@ impl FlatS3EventMessages { /// A flattened AWS S3 record #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Clone, Default)] pub struct FlatS3EventMessage { - pub object_id: Uuid, + pub object_group_id: Uuid, pub s3_object_id: Uuid, pub public_id: Uuid, pub sequencer: Option, @@ -459,13 +459,13 @@ impl FlatS3EventMessage { pub fn new_with_generated_id() -> Self { Self::default() .with_s3_object_id(UuidGenerator::generate()) - .with_object_id(UuidGenerator::generate()) + .with_object_group_id(UuidGenerator::generate()) .with_public_id(UuidGenerator::generate()) } /// Create an event with a newly generated s3_object_id. pub fn regenerate_ids(mut self) -> Self { - self.object_id = UuidGenerator::generate(); + self.object_group_id = UuidGenerator::generate(); self.s3_object_id = UuidGenerator::generate(); self.public_id = UuidGenerator::generate(); self @@ -513,9 +513,9 @@ impl FlatS3EventMessage { self } - /// Set the object id. - pub fn with_object_id(mut self, object_id: Uuid) -> Self { - self.object_id = object_id; + /// Set the object group id. + pub fn with_object_group_id(mut self, object_id: Uuid) -> Self { + self.object_group_id = object_id; self } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs index e4bd8fec0..e42ea7343 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs @@ -64,7 +64,10 @@ mod tests { let first = entries.first().unwrap(); let builder = GetQueryBuilder::new(&client); - let result = builder.get_object_group(first.0.object_id).await.unwrap(); + let result = builder + .get_object_group(first.0.object_group_id) + .await + .unwrap(); assert_eq!(result.as_ref(), Some(&first.0)); } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs index 1e33bbf40..e8f162b33 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs @@ -48,18 +48,18 @@ pub(crate) mod tests { } pub(crate) fn generate_entry(index: usize) -> (ActiveObjectGroup, ActiveS3Object) { - let object_id = UuidGenerator::generate(); + let object_group_id = UuidGenerator::generate(); let event = event_type(index); let date = || Set(Some(DateTime::default().add(Days::new(index as u64)))); ( ActiveObjectGroup { - object_id: Set(object_id), + object_group_id: Set(object_group_id), attributes: Set(None), }, ActiveS3Object { s3_object_id: Set(UuidGenerator::generate()), - object_id: Set(object_id), + object_group_id: Set(object_group_id), public_id: Set(UuidGenerator::generate()), event_type: Set(event.clone()), // Half as many buckets as keys. diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs index 569e93721..dac82459e 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs @@ -67,7 +67,7 @@ mod tests { let response = app .oneshot( Request::builder() - .uri(format!("/object_groups/{}", first.0.object_id)) + .uri(format!("/object_groups/{}", first.0.object_group_id)) .body(Body::empty()) .unwrap(), ) From 71408e4c0c8940791b0edb069ba944557fe96c5e Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 24 Jun 2024 18:28:39 +1000 Subject: [PATCH 10/12] style(filemanager): add space to migration script --- .../database/migrations/0003_rename_object_to_group.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql b/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql index b2ff29a7d..f2f26a172 100644 --- a/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql +++ b/lib/workload/stateless/stacks/filemanager/database/migrations/0003_rename_object_to_group.sql @@ -1,3 +1,3 @@ alter table object rename to object_group; alter table object_group rename object_id to object_group_id; -alter table s3_object rename object_id to object_group_id; \ No newline at end of file +alter table s3_object rename object_id to object_group_id; From 729cc35eee52428462955c5d63c497b460833527 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Fri, 28 Jun 2024 09:08:30 +1000 Subject: [PATCH 11/12] test(filemanager): improve migration test --- .../filemanager/src/database/aws/migration.rs | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs index 657526fdb..1e855ee5e 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs @@ -56,7 +56,9 @@ impl Migrate for Migration { #[cfg(test)] pub(crate) mod tests { use lazy_static::lazy_static; + use sqlx::postgres::PgRow; use sqlx::PgPool; + use sqlx::Row; use super::*; @@ -68,24 +70,28 @@ pub(crate) mod tests { async fn test_migrate(pool: PgPool) { let migrate = Migration::new(Client::from_pool(pool)); - let not_exists = sqlx::query!( - "select exists (select from information_schema.tables where table_name = 'object')" - ) - .fetch_one(migrate.client.pool()) - .await - .unwrap(); + let object_exists = check_table_exists(&migrate, "object").await; + let s3_object_exists = check_table_exists(&migrate, "s3_object").await; - assert!(!not_exists.exists.unwrap()); + assert!(!object_exists.get::("exists")); + assert!(!s3_object_exists.get::("exists")); migrate.migrate().await.unwrap(); - let exists = sqlx::query!( - "select exists (select from information_schema.tables where table_name = 'object')" - ) - .fetch_one(migrate.client.pool()) - .await - .unwrap(); + let object_exists = check_table_exists(&migrate, "object").await; + let s3_object_exists = check_table_exists(&migrate, "s3_object").await; - assert!(exists.exists.unwrap()); + assert!(object_exists.get::("exists")); + assert!(s3_object_exists.get::("exists")); + } + + async fn check_table_exists(migration: &Migration, table_name: &str) -> PgRow { + sqlx::query(&format!( + "select exists (select from information_schema.tables where table_name = '{}')", + table_name + )) + .fetch_one(migration.client().pool()) + .await + .unwrap() } } From ca5a69a6a3d1e972ccdbcdb976ba32086fb1e52d Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Fri, 28 Jun 2024 09:24:13 +1000 Subject: [PATCH 12/12] fix(filemanager) environment variable config --- .../src/database/aws/credentials.rs | 2 +- .../stacks/filemanager/filemanager/src/env.rs | 80 ++++++++++++------- .../filemanager/src/events/aws/collecter.rs | 2 +- 3 files changed, 53 insertions(+), 31 deletions(-) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs index 00812bff4..3024acd4f 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs @@ -209,7 +209,7 @@ mod tests { pghost: Some("127.0.0.1".to_string()), pgport: Some(5432), pguser: Some("filemanager".to_string()), - sqs_queue_url: None, + sqs_url: None, paired_ingest_mode: false, }; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs index e3fe32d2c..63cbdfe21 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs @@ -4,44 +4,22 @@ use crate::error::Error::LoadingEnvironment; use crate::error::Result; use envy::from_env; -use serde::de::Error; -use serde::{Deserialize, Deserializer}; -use std::result; +use serde::Deserialize; /// Configuration environment variables for filemanager. -#[derive(Debug, Deserialize, Default)] +#[derive(Debug, Deserialize, Default, Eq, PartialEq)] pub struct Config { pub(crate) database_url: Option, pub(crate) pgpassword: Option, pub(crate) pghost: Option, pub(crate) pgport: Option, pub(crate) pguser: Option, - pub(crate) sqs_queue_url: Option, - #[serde(deserialize_with = "deserialize_bool_with_num")] + #[serde(rename = "filemanager_sqs_url")] + pub(crate) sqs_url: Option, + #[serde(default, rename = "filemanager_paired_ingest_mode")] pub(crate) paired_ingest_mode: bool, } -fn deserialize_bool_with_num<'de, D>(deserializer: D) -> result::Result -where - D: Deserializer<'de>, -{ - let value: Option = Deserialize::deserialize(deserializer)?; - - Ok(value - .map(|value| { - if value == "1" { - Ok(true) - } else if value == "0" { - Ok(false) - } else { - value.parse::() - } - }) - .transpose() - .map_err(Error::custom)? - .unwrap_or_default()) -} - impl Config { /// Load environment variables into a `Config` struct. pub fn load() -> Result { @@ -74,8 +52,8 @@ impl Config { } /// Get the SQS url. - pub fn sqs_queue_url(&self) -> Option<&str> { - self.sqs_queue_url.as_deref() + pub fn sqs_url(&self) -> Option<&str> { + self.sqs_url.as_deref() } /// Get the paired ingest mode. @@ -95,3 +73,47 @@ impl Config { value.ok_or_else(|| LoadingEnvironment("missing environment variable".to_string())) } } + +#[cfg(test)] +mod tests { + use envy::from_iter; + + use super::*; + + #[test] + fn test_environment() { + let data = vec![ + ("DATABASE_URL", "url"), + ("PGPASSWORD", "password"), + ("PGHOST", "host"), + ("PGPORT", "1234"), + ("PGUSER", "user"), + ("FILEMANAGER_SQS_URL", "url"), + ("FILEMANAGER_PAIRED_INGEST_MODE", "true"), + ] + .into_iter() + .map(|(key, value)| (key.to_string(), value.to_string())); + + let config: Config = from_iter(data).unwrap(); + + assert_eq!( + config, + Config { + database_url: Some("url".to_string()), + pgpassword: Some("password".to_string()), + pghost: Some("host".to_string()), + pgport: Some(1234), + pguser: Some("user".to_string()), + sqs_url: Some("url".to_string()), + paired_ingest_mode: true, + } + ) + } + + #[test] + fn test_environment_defaults() { + let config: Config = from_iter(vec![]).unwrap(); + + assert_eq!(config, Default::default()); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs index bf76d76da..e3fa279f2 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs @@ -97,7 +97,7 @@ impl CollecterBuilder { /// Build a collector by manually calling receive to obtain the raw events. pub async fn build_receive(mut self, config: &Config) -> Result { let url = self.sqs_url.take(); - let url = Config::value_or_else(url.as_deref(), config.sqs_queue_url())?; + let url = Config::value_or_else(url.as_deref(), config.sqs_url())?; let client = self.sqs_client.take(); if let Some(sqs_client) = &client {