diff --git a/lib/workload/stateless/stacks/filemanager/Cargo.lock b/lib/workload/stateless/stacks/filemanager/Cargo.lock index 90106cb3a..d8bb154fb 100644 --- a/lib/workload/stateless/stacks/filemanager/Cargo.lock +++ b/lib/workload/stateless/stacks/filemanager/Cargo.lock @@ -891,6 +891,61 @@ dependencies = [ "serde_with", ] +[[package]] +name = "axum" +version = "0.7.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a6c9af12842a67734c9a2e355436e5d03b22383ed60cf13cd0c18fbfe3dcbcf" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "hyper 1.3.1", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper 1.0.1", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15c63fd72d41492dc4f497196f5da1fb04fb7529e631d73630d1b491e47a2e3" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper 0.1.2", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -1624,10 +1679,12 @@ dependencies = [ "aws-smithy-mocks-experimental", "aws-smithy-runtime-api", "aws_lambda_events", + "axum", "bytes", "chrono", "csv", "dotenvy", + "envy", "filemanager", "filemanager-build", "flate2", @@ -1647,8 +1704,10 @@ dependencies = [ "serde_json", "serde_with", "sqlx", + "strum 0.26.2", "thiserror", "tokio", + "tower", "tracing", "tracing-subscriber", "url", @@ -2155,6 +2214,7 @@ dependencies = [ "http 1.1.0", "http-body 1.0.0", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -2561,6 +2621,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md-5" version = "0.10.6" @@ -4332,6 +4398,9 @@ name = "strum" version = "0.26.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d8cec3501a5194c432b2b7976db6b7d10ec95c253208b45f83f7136aa985e29" +dependencies = [ + "strum_macros", +] [[package]] name = "strum_macros" @@ -4407,6 +4476,18 @@ dependencies = [ "syn 2.0.65", ] +[[package]] +name = "sync_wrapper" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" + +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" + [[package]] name = "tap" version = "1.0.1" diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/error.rs b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/error.rs index 5006c469f..9207e77b6 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/error.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/error.rs @@ -1,15 +1,17 @@ //! This module contains the crate's error types. //! -use crate::error::ErrorKind::{IoError, LoadingEnvironment}; -use crate::workspace_path; -use miette::{diagnostic, Diagnostic, NamedSource, SourceOffset}; use std::fmt::{Display, Formatter}; use std::fs::read_to_string; use std::panic::Location; use std::{io, result}; + +use miette::{diagnostic, Diagnostic, NamedSource, SourceOffset}; use thiserror::Error; +use crate::error::ErrorKind::{IoError, LoadingEnvironment}; +use crate::workspace_path; + pub type Result = result::Result; /// Error types for the filemanager. diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/gen_entities.rs b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/gen_entities.rs index 08237fa74..db782e8d3 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/gen_entities.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/gen_entities.rs @@ -2,14 +2,16 @@ //! database schema. //! -use crate::error::ErrorKind::EntityGeneration; -use crate::error::{Error, Result}; -use crate::Config; +use std::ffi::OsStr; +use std::fs::write; + use clap_builder::Parser; use quote::quote; use sea_orm_cli::{run_generate_command, Cli, Commands}; -use std::ffi::OsStr; -use std::fs::write; + +use crate::error::ErrorKind::EntityGeneration; +use crate::error::{Error, Result}; +use crate::Config; pub async fn generate_entities() -> Result<()> { let config = Config::load()?; @@ -19,6 +21,14 @@ pub async fn generate_entities() -> Result<()> { "sea-orm-cli", "generate", "entity", + "--with-serde", + "both", + "--enum-extra-derives", + "strum::FromRepr", + "--enum-extra-derives", + "strum::EnumCount", + "--enum-extra-attributes", + "repr(u8)", "-u", &config.database_url, "-o", @@ -44,6 +54,5 @@ pub async fn generate_entities() -> Result<()> { let out_file = out_dir.join("generated.rs"); write(out_file, format!("{}\n\n{}", generated_comment, generated))?; - Ok(()) } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs index 430e49c7b..da643acb7 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-build/src/lib.rs @@ -1,14 +1,18 @@ -use crate::error::Error; -use crate::error::ErrorKind::LoadingEnvironment; -use envy::from_env; -use error::Result; -use serde::Deserialize; use std::env::var; use std::path::{Path, PathBuf}; +use envy::from_env; +use serde::Deserialize; + +use error::Result; + +use crate::error::Error; +use crate::error::ErrorKind::LoadingEnvironment; + pub mod error; pub mod gen_entities; +/// Configuration environment variables for the build process. #[derive(Debug, Deserialize)] pub struct Config { database_url: String, @@ -16,6 +20,7 @@ pub struct Config { } impl Config { + /// Load environment variables into a `Config` struct. #[track_caller] pub fn load() -> Result { Ok(from_env::()?) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs index 39c403091..035ccc17b 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-http-lambda/src/main.rs @@ -1,3 +1,4 @@ +use filemanager::env::Config; use lambda_http::Error; use lambda_runtime::{run, service_fn, LambdaEvent}; @@ -11,15 +12,17 @@ use filemanager::handlers::init_tracing; async fn main() -> Result<(), Error> { init_tracing(); - let options = &create_database_pool().await?; + let config = &Config::load()?; + let options = &create_database_pool(config).await?; run(service_fn(|_: LambdaEvent<()>| async move { - update_credentials(options).await?; + update_credentials(options, config).await?; receive_and_ingest( S3Client::with_defaults().await, SQSClient::with_defaults().await, None::, - DbClient::from_ref(options), + DbClient::new(options.clone()), + config, ) .await?; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs index e686f5cf9..a50033e02 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-ingest-lambda/src/main.rs @@ -3,6 +3,7 @@ use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use filemanager::clients::aws::s3::Client; use filemanager::database::Client as DbClient; +use filemanager::env::Config; use filemanager::handlers::aws::{create_database_pool, ingest_event, update_credentials}; use filemanager::handlers::init_tracing; @@ -10,14 +11,16 @@ use filemanager::handlers::init_tracing; async fn main() -> Result<(), Error> { init_tracing(); - let options = &create_database_pool().await?; + let config = &Config::load()?; + let options = &create_database_pool(config).await?; run(service_fn(|event: LambdaEvent| async move { - update_credentials(options).await?; + update_credentials(options, config).await?; ingest_event( event.payload, Client::with_defaults().await, - DbClient::from_ref(options), + DbClient::new(options.clone()), + config, ) .await?; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs index 2caa274b0..719d6eff6 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-inventory-lambda/src/main.rs @@ -3,6 +3,7 @@ use lambda_runtime::{run, service_fn, Error, LambdaEvent}; use serde::Deserialize; use filemanager::database::Client as DbClient; +use filemanager::env::Config; use filemanager::events::aws::inventory::Manifest; use filemanager::handlers::aws::{create_database_pool, ingest_s3_inventory, update_credentials}; use filemanager::handlers::init_tracing; @@ -26,12 +27,13 @@ pub struct BucketKey { async fn main() -> Result<(), Error> { init_tracing(); - let options = &create_database_pool().await?; + let config = &Config::load()?; + let options = &create_database_pool(config).await?; run(service_fn(|event: LambdaEvent| async move { - update_credentials(options).await?; + update_credentials(options, config).await?; let client = Client::with_defaults().await; - let database = DbClient::from_ref(options); + let database = DbClient::new(options.clone()); match event.payload { Request::BucketKey(bucket_key) => { @@ -41,11 +43,12 @@ async fn main() -> Result<(), Error> { Some(bucket_key.bucket), Some(bucket_key.key), None, + config, ) .await? } Request::Manifest(manifest) => { - ingest_s3_inventory(client, database, None, None, Some(manifest)).await? + ingest_s3_inventory(client, database, None, None, Some(manifest), config).await? } }; diff --git a/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs b/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs index 4eef907f4..cdf757396 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager-migrate-lambda/src/main.rs @@ -7,6 +7,7 @@ use crate::Event::Provider; use filemanager::database::aws::migration::Migration; use filemanager::database::Client as DbClient; use filemanager::database::Migrate; +use filemanager::env::Config; use filemanager::handlers::aws::{create_database_pool, update_credentials}; use filemanager::handlers::init_tracing; @@ -32,9 +33,10 @@ pub enum CloudFormationRequest { async fn main() -> Result<(), Error> { init_tracing(); - let options = &create_database_pool().await?; + let config = &Config::load()?; + let options = &create_database_pool(config).await?; run(service_fn(|event: LambdaEvent| async move { - update_credentials(options).await?; + update_credentials(options, config).await?; // Migrate depending on the type of lifecycle event using the CDK provider framework: // https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.custom_resources-readme.html#provider-framework @@ -47,7 +49,7 @@ async fn main() -> Result<(), Error> { _ => { // If there's nothing to migrate, then this will just return Ok. Ok::<_, Error>( - Migration::new(DbClient::from_ref(options)) + Migration::new(DbClient::new(options.clone())) .migrate() .await?, ) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml index 91e086273..5733849c2 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml +++ b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml @@ -12,18 +12,28 @@ rust-version.workspace = true migrate = ["sqlx/migrate"] [dependencies] +# Serde serde = { version = "1", features = ["derive"] } serde_json = "1" serde_with = "3" +# Async async-trait = "0.1" futures = "0.3" tokio = { version = "1", features = ["rt-multi-thread", "macros"] } tracing = "0.1" tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } +# Database sqlx = { version = "0.7", features = ["postgres", "runtime-tokio", "tls-rustls", "chrono", "uuid"] } -sea-orm = { version = "0.12", features = ["sqlx-postgres", "runtime-tokio-rustls"] } +sea-orm = { version = "0.12", features = ["sqlx-postgres", "runtime-tokio-rustls", "macros", "sea-orm-internal"] } +strum = { version = "0.26", features = ["derive"] } + +# Query server +axum = "0.7" +tower = "0.4" + +# General chrono = { version = "0.4", features = ["serde"] } thiserror = "1" uuid = { version = "1", features = ["v7"] } @@ -31,6 +41,10 @@ mockall = "0.12" mockall_double = "0.3" itertools = "0.12" url = "2" +bytes = "1.6" +envy = "0.4" + +# Inventory csv = "1" flate2 = "1" md5 = "0.7" @@ -39,8 +53,8 @@ parquet = { version = "50", features = ["async"] } arrow = { version = "50", features = ["chrono-tz"] } arrow-json = "50" orc-rust = "0.3" -bytes = "1.6" +# AWS aws-sdk-sqs = "1" aws-config = "1" aws-sdk-s3 = "1" diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs index f45480768..3024acd4f 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/credentials.rs @@ -14,9 +14,9 @@ use url::Url; #[double] use crate::clients::aws::config::Config; use crate::database::CredentialGenerator; +use crate::env::Config as EnvConfig; use crate::error::Error::CredentialGeneratorError; use crate::error::Result; -use crate::read_env; /// Number of seconds that the IAM credentials expire in. /// Equals the max Lambda timeout. @@ -57,32 +57,18 @@ impl IamGeneratorBuilder { } /// Build the credential generator. - pub async fn build(self) -> Result { + pub async fn build(self, env_config: &EnvConfig) -> Result { let config = if let Some(config) = self.config { config } else { Config::with_defaults().await }; - let host = if let Some(host) = self.host { - host - } else { - read_env("PGHOST")? - }; - - let port = if let Some(port) = self.port { - port - } else { - read_env("PGPORT")?.parse().map_err(|_| { - CredentialGeneratorError("failed to parse port from PGPORT".to_string()) - })? - }; - - let user = if let Some(user) = self.user { - user - } else { - read_env("PGUSER")? - }; + let host = + EnvConfig::value_or_else(self.host.as_deref(), env_config.pg_host())?.to_string(); + let port = EnvConfig::value_or_else(self.port, env_config.pg_port())?; + let user = + EnvConfig::value_or_else(self.user.as_deref(), env_config.pg_user())?.to_string(); Ok(IamGenerator::new(config, host, port, user)) } @@ -192,7 +178,6 @@ impl CredentialGenerator for IamGenerator { mod tests { use std::borrow::Cow; use std::collections::HashMap; - use std::env::set_var; use std::future::Future; use aws_credential_types::Credentials; @@ -209,7 +194,7 @@ mod tests { .with_host("127.0.0.1".to_string()) .with_port(5432) .with_user("filemanager".to_string()) - .build() + .build(&Default::default()) .await .unwrap() }) @@ -218,14 +203,20 @@ mod tests { #[tokio::test] async fn generate_iam_token_env() { - set_var("PGHOST", "127.0.0.1"); - set_var("PGPORT", "5432"); - set_var("PGUSER", "filemanager"); + let env_config = EnvConfig { + database_url: None, + pgpassword: None, + pghost: Some("127.0.0.1".to_string()), + pgport: Some(5432), + pguser: Some("filemanager".to_string()), + sqs_url: None, + paired_ingest_mode: false, + }; test_generate_iam_token(|config| async { IamGeneratorBuilder::default() .with_config(config) - .build() + .build(&env_config) .await .unwrap() }) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs index d67a5f141..353c70a3a 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester.rs @@ -7,14 +7,15 @@ use tracing::debug; use uuid::Uuid; use crate::database::{Client, CredentialGenerator}; +use crate::env::Config; use crate::error::Result; use crate::events::aws::message::EventType; use crate::events::aws::{StorageClass, TransposedS3EventMessages}; /// An ingester for S3 events. #[derive(Debug)] -pub struct Ingester<'a> { - pub(crate) client: Client<'a>, +pub struct Ingester { + pub(crate) client: Client, } /// The type representing an insert query. @@ -24,17 +25,18 @@ struct Insert { number_duplicate_events: i64, } -impl<'a> Ingester<'a> { +impl Ingester { /// Create a new ingester. - pub fn new(client: Client<'a>) -> Self { + pub fn new(client: Client) -> Self { Self { client } } /// Create a new ingester with a default database client. - pub async fn with_defaults(generator: Option) -> Result { - Ok(Self { - client: Client::from_generator(generator).await?, - }) + pub async fn with_defaults( + generator: Option, + config: &Config, + ) -> Result { + Ok(Self::new(Client::from_generator(generator, config).await?)) } /// Reprocess inserts to find object ids that are not duplicates from the inserted events. @@ -1374,7 +1376,7 @@ pub(crate) mod tests { events } - pub(crate) async fn fetch_results<'a>(client: &Client<'a>) -> (Vec, Vec) { + pub(crate) async fn fetch_results<'a>(client: &Client) -> (Vec, Vec) { ( sqlx::query("select * from object") .fetch_all(client.pool()) @@ -1387,7 +1389,7 @@ pub(crate) mod tests { ) } - pub(crate) async fn fetch_results_ordered<'a>(client: &Client<'a>) -> Vec { + pub(crate) async fn fetch_results_ordered<'a>(client: &Client) -> Vec { sqlx::query("select * from s3_object order by sequencer, key, version_id") .fetch_all(client.pool()) .await @@ -1527,7 +1529,7 @@ pub(crate) mod tests { update_test_events(expected_events_simple_delete_marker()) } - pub(crate) fn test_ingester<'a>(pool: PgPool) -> Client<'a> { - Client::new(pool) + pub(crate) fn test_ingester(pool: PgPool) -> Client { + Client::from_pool(pool) } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs index 58d44d9d4..ed8c2f5ff 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/ingester_paired.rs @@ -8,6 +8,7 @@ use tracing::{debug, trace}; use uuid::Uuid; use crate::database::{Client, CredentialGenerator}; +use crate::env::Config; use crate::error::Result; use crate::events::aws::inventory::Inventory; use crate::events::aws::message::EventType; @@ -18,8 +19,8 @@ use crate::uuid::UuidGenerator; /// An ingester for S3 events. #[derive(Debug)] -pub struct IngesterPaired<'a> { - client: Client<'a>, +pub struct IngesterPaired { + client: Client, } /// The type representing an insert query. @@ -29,17 +30,18 @@ struct Insert { number_duplicate_events: i64, } -impl<'a> IngesterPaired<'a> { +impl IngesterPaired { /// Create a new ingester. - pub fn new(client: Client<'a>) -> Self { + pub fn new(client: Client) -> Self { Self { client } } /// Create a new ingester with a default database client. - pub async fn with_defaults(generator: Option) -> Result { - Ok(Self { - client: Client::from_generator(generator).await?, - }) + pub async fn with_defaults( + generator: Option, + config: &Config, + ) -> Result { + Ok(Self::new(Client::from_generator(generator, config).await?)) } fn reprocess_updated( @@ -1807,7 +1809,7 @@ pub(crate) mod tests { events } - pub(crate) fn test_ingester<'a>(pool: PgPool) -> Client<'a> { - Client::new(pool) + pub(crate) fn test_ingester(pool: PgPool) -> Client { + Client::from_pool(pool) } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs index 04843d483..1e855ee5e 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/migration.rs @@ -7,26 +7,28 @@ use sqlx::migrate::Migrator; use tracing::trace; use crate::database::{Client, CredentialGenerator, Migrate}; +use crate::env::Config; use crate::error::Error::MigrateError; use crate::error::Result; /// A struct to perform database migrations. #[derive(Debug)] -pub struct Migration<'a> { - client: Client<'a>, +pub struct Migration { + client: Client, } -impl<'a> Migration<'a> { +impl Migration { /// Create a new migration. - pub fn new(client: Client<'a>) -> Self { + pub fn new(client: Client) -> Self { Self { client } } /// Create a new migration with a default database client. - pub async fn with_defaults(generator: Option) -> Result { - Ok(Self { - client: Client::from_generator(generator).await?, - }) + pub async fn with_defaults( + generator: Option, + config: &Config, + ) -> Result { + Ok(Self::new(Client::from_generator(generator, config).await?)) } /// Get the underlying sqlx migrator for the migrations. @@ -41,7 +43,7 @@ impl<'a> Migration<'a> { } #[async_trait] -impl<'a> Migrate for Migration<'a> { +impl Migrate for Migration { async fn migrate(&self) -> Result<()> { trace!("applying migrations"); Self::migrator() @@ -54,7 +56,9 @@ impl<'a> Migrate for Migration<'a> { #[cfg(test)] pub(crate) mod tests { use lazy_static::lazy_static; + use sqlx::postgres::PgRow; use sqlx::PgPool; + use sqlx::Row; use super::*; @@ -64,26 +68,30 @@ pub(crate) mod tests { #[sqlx::test(migrations = false)] async fn test_migrate(pool: PgPool) { - let migrate = Migration::new(Client::new(pool)); + let migrate = Migration::new(Client::from_pool(pool)); - let not_exists = sqlx::query!( - "select exists (select from information_schema.tables where table_name = 'object')" - ) - .fetch_one(migrate.client.pool()) - .await - .unwrap(); + let object_exists = check_table_exists(&migrate, "object").await; + let s3_object_exists = check_table_exists(&migrate, "s3_object").await; - assert!(!not_exists.exists.unwrap()); + assert!(!object_exists.get::("exists")); + assert!(!s3_object_exists.get::("exists")); migrate.migrate().await.unwrap(); - let exists = sqlx::query!( - "select exists (select from information_schema.tables where table_name = 'object')" - ) - .fetch_one(migrate.client.pool()) - .await - .unwrap(); + let object_exists = check_table_exists(&migrate, "object").await; + let s3_object_exists = check_table_exists(&migrate, "s3_object").await; - assert!(exists.exists.unwrap()); + assert!(object_exists.get::("exists")); + assert!(s3_object_exists.get::("exists")); + } + + async fn check_table_exists(migration: &Migration, table_name: &str) -> PgRow { + sqlx::query(&format!( + "select exists (select from information_schema.tables where table_name = '{}')", + table_name + )) + .fetch_one(migration.client().pool()) + .await + .unwrap() } } diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs index 53010daab..568f89908 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/aws/query.rs @@ -7,17 +7,17 @@ use crate::events::aws::{FlatS3EventMessage, FlatS3EventMessages, StorageClass}; /// Query the filemanager via REST interface. #[derive(Debug)] -pub struct Query<'a> { - client: Client<'a>, +pub struct Query { + client: Client, } pub struct QueryResults { _results: Vec, // FIXME: Adjust return type } -impl<'a> Query<'a> { +impl Query { /// Creates a new filemanager query client. - pub fn new(client: Client<'a>) -> Self { + pub fn new(client: Client) -> Self { Self { client } } @@ -95,7 +95,7 @@ mod tests { #[sqlx::test(migrator = "MIGRATOR")] async fn test_select_existing_by_bucket_key(pool: PgPool) { let ingester = test_ingester(pool.clone()); - let query = Query::new(Client::new(pool)); + let query = Query::new(Client::from_pool(pool)); let events = test_events(Some(Created)); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs index 6d4d34e8d..cde002207 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/database/mod.rs @@ -1,18 +1,17 @@ //! This module handles connecting to the filemanager database for actions such as ingesting events. //! -use std::borrow::Cow; - use crate::database::aws::ingester::Ingester; use crate::database::aws::ingester_paired::IngesterPaired; +use crate::env::Config; use async_trait::async_trait; +use sea_orm::{ConnectOptions, Database, DatabaseConnection, SqlxPostgresConnector}; use sqlx::postgres::PgConnectOptions; -use sqlx::PgPool; +use sqlx::{ConnectOptions as SqlxConnectOptions, PgPool}; use tracing::debug; use crate::error::Result; use crate::events::EventSourceType; -use crate::read_env; pub mod aws; @@ -28,36 +27,50 @@ pub trait CredentialGenerator { /// A database client handles database interaction. #[derive(Debug, Clone)] -pub struct Client<'a> { +pub struct Client { // Use a Cow here to allow an owned pool or a shared reference to a pool. - pool: Cow<'a, PgPool>, + connection: DatabaseConnection, } -impl<'a> Client<'a> { +impl Client { /// Create a database from an existing pool. - pub fn new(pool: PgPool) -> Self { - Self { - pool: Cow::Owned(pool), - } + pub fn new(connection: DatabaseConnection) -> Self { + Self { connection } } - /// Create a database from a reference to an existing pool. - pub fn from_ref(pool: &'a PgPool) -> Self { - Self { - pool: Cow::Borrowed(pool), - } + /// Create a database connection from an existing pool. + pub fn from_pool(pool: PgPool) -> Self { + Self::new(SqlxPostgresConnector::from_sqlx_postgres_pool(pool)) } /// Create a database using default credential loading logic as defined in /// `Self::connect_options`. - pub async fn from_generator(generator: Option) -> Result { - Ok(Self::new(Self::create_pool(generator).await?)) + pub async fn from_generator( + generator: Option, + config: &Config, + ) -> Result { + Ok(Self::new(Self::create_pool(generator, config).await?)) } /// Create a database connection pool using credential loading logic defined in /// `Self::connect_options`. - pub async fn create_pool(generator: Option) -> Result { - Ok(PgPool::connect_with(Self::connect_options(generator).await?).await?) + pub async fn create_pool( + generator: Option, + config: &Config, + ) -> Result { + Ok(Database::connect(Self::connect_options(generator, config).await?).await?) + } + + /// Create database connect options using a series of credential loading logic. + pub async fn connect_options( + generator: Option, + config: &Config, + ) -> Result { + Ok(ConnectOptions::new( + Self::pg_connect_options(generator, config) + .await? + .to_url_lossy(), + )) } /// Create database connect options using a series of credential loading logic. @@ -65,15 +78,17 @@ impl<'a> Client<'a> { /// First, this tries to load a DATABASE_URL environment variable to connect. /// Then, it uses the generator if it is not None and PGPASSWORD is not set. /// Otherwise, uses default logic defined in PgConnectOptions::default. - pub async fn connect_options( + pub async fn pg_connect_options( generator: Option, + config: &Config, ) -> Result { // If the DATABASE_URL is defined, use that. - if let Ok(url) = read_env("DATABASE_URL") { + if let Some(url) = config.database_url() { return Ok(url.parse()?); } + // If PGPASSWORD is set, use default options. - if read_env("PGPASSWORD").is_ok() { + if config.pg_password().is_some() { return Ok(PgConnectOptions::default()); } @@ -89,21 +104,32 @@ impl<'a> Client<'a> { /// Get the database pool. pub fn pool(&self) -> &PgPool { - &self.pool + self.connection.get_postgres_connection_pool() + } + + /// Get the database connection. Clones the underlying `DatabaseConnection` which is + /// intended to be cheaply cloneable because it represents an Arc to a shared connection pool. + pub fn connection(&self) -> DatabaseConnection { + self.connection.clone() + } + + /// Get the database connection as a reference. + pub fn connection_ref(&self) -> &DatabaseConnection { + &self.connection } } #[async_trait] -impl<'a> Ingest<'a> for Client<'a> { - async fn ingest(&'a self, events: EventSourceType) -> Result<()> { +impl Ingest for Client { + async fn ingest(&self, events: EventSourceType) -> Result<()> { match events { EventSourceType::S3(events) => { - Ingester::new(Self::from_ref(self.pool())) + Ingester::new(Self::new(self.connection())) .ingest_events(events) .await } EventSourceType::S3Paired(events) => { - IngesterPaired::new(Self::from_ref(self.pool())) + IngesterPaired::new(Self::new(self.connection())) .ingest_events(events) .await } @@ -113,9 +139,9 @@ impl<'a> Ingest<'a> for Client<'a> { /// This trait ingests raw events into the database. #[async_trait] -pub trait Ingest<'a> { +pub trait Ingest { /// Ingest the events. - async fn ingest(&'a self, events: EventSourceType) -> Result<()>; + async fn ingest(&self, events: EventSourceType) -> Result<()>; } /// Trait representing database migrations. diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs new file mode 100644 index 000000000..63cbdfe21 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/env.rs @@ -0,0 +1,119 @@ +//! Handles loading environment variables as config options for filemanager. +//! + +use crate::error::Error::LoadingEnvironment; +use crate::error::Result; +use envy::from_env; +use serde::Deserialize; + +/// Configuration environment variables for filemanager. +#[derive(Debug, Deserialize, Default, Eq, PartialEq)] +pub struct Config { + pub(crate) database_url: Option, + pub(crate) pgpassword: Option, + pub(crate) pghost: Option, + pub(crate) pgport: Option, + pub(crate) pguser: Option, + #[serde(rename = "filemanager_sqs_url")] + pub(crate) sqs_url: Option, + #[serde(default, rename = "filemanager_paired_ingest_mode")] + pub(crate) paired_ingest_mode: bool, +} + +impl Config { + /// Load environment variables into a `Config` struct. + pub fn load() -> Result { + Ok(from_env::()?) + } + + /// Get the database url. + pub fn database_url(&self) -> Option<&str> { + self.database_url.as_deref() + } + + /// Get the pg password. + pub fn pg_password(&self) -> Option<&str> { + self.pgpassword.as_deref() + } + + /// Get the pg host. + pub fn pg_host(&self) -> Option<&str> { + self.pghost.as_deref() + } + + /// Get the pg port. + pub fn pg_port(&self) -> Option { + self.pgport + } + + /// Get the pg user. + pub fn pg_user(&self) -> Option<&str> { + self.pguser.as_deref() + } + + /// Get the SQS url. + pub fn sqs_url(&self) -> Option<&str> { + self.sqs_url.as_deref() + } + + /// Get the paired ingest mode. + pub fn paired_ingest_mode(&self) -> bool { + self.paired_ingest_mode + } + + /// Get the value from an optional, or else try and get a different value, unwrapping into a Result. + pub fn value_or_else(value: Option, or_else: Option) -> Result { + value + .map(Ok) + .unwrap_or_else(|| Self::value_into_err(or_else)) + } + + /// Convert an optional value to a missing environment variable error. + pub fn value_into_err(value: Option) -> Result { + value.ok_or_else(|| LoadingEnvironment("missing environment variable".to_string())) + } +} + +#[cfg(test)] +mod tests { + use envy::from_iter; + + use super::*; + + #[test] + fn test_environment() { + let data = vec![ + ("DATABASE_URL", "url"), + ("PGPASSWORD", "password"), + ("PGHOST", "host"), + ("PGPORT", "1234"), + ("PGUSER", "user"), + ("FILEMANAGER_SQS_URL", "url"), + ("FILEMANAGER_PAIRED_INGEST_MODE", "true"), + ] + .into_iter() + .map(|(key, value)| (key.to_string(), value.to_string())); + + let config: Config = from_iter(data).unwrap(); + + assert_eq!( + config, + Config { + database_url: Some("url".to_string()), + pgpassword: Some("password".to_string()), + pghost: Some("host".to_string()), + pgport: Some(1234), + pguser: Some("user".to_string()), + sqs_url: Some("url".to_string()), + paired_ingest_mode: true, + } + ) + } + + #[test] + fn test_environment_defaults() { + let config: Config = from_iter(vec![]).unwrap(); + + assert_eq!(config, Default::default()); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs index 46c774ef3..dd41f96c6 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/error.rs @@ -1,6 +1,7 @@ //! Errors used by the filemanager crate. //! +use sea_orm::DbErr; use std::result; use sqlx::migrate::MigrateError; @@ -19,10 +20,8 @@ pub enum Error { SQSError(String), #[error("deserialization error: `{0}`")] DeserializeError(String), - #[error("Missing environment variable: `{0}`")] - MissingEnvironmentVariable(String), - #[error("Invalid environment variable: `{0}`")] - InvalidEnvironmentVariable(String), + #[error("Loading environment variables: `{0}`")] + LoadingEnvironment(String), #[error("credential generator error: `{0}`")] CredentialGeneratorError(String), #[error("S3 inventory error: `{0}`")] @@ -35,6 +34,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: DbErr) -> Self { + Self::SQLError(err.to_string()) + } +} + impl From for Error { fn from(err: MigrateError) -> Self { Self::SQLError(err.to_string()) @@ -46,3 +51,9 @@ impl From for Error { Self::DeserializeError(err.to_string()) } } + +impl From for Error { + fn from(error: envy::Error) -> Self { + Self::LoadingEnvironment(error.to_string()) + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs index 0993e4b67..e3fa279f2 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/collecter.rs @@ -17,14 +17,13 @@ use crate::clients::aws::s3::Client; use crate::clients::aws::s3::Client as S3Client; #[double] use crate::clients::aws::sqs::Client as SQSClient; -use crate::error::Error::InvalidEnvironmentVariable; +use crate::env::Config; use crate::error::Error::{DeserializeError, SQSError}; use crate::error::Result; use crate::events::aws::{ EventType, FlatS3EventMessage, FlatS3EventMessages, StorageClass, TransposedS3EventMessages, }; use crate::events::{Collect, EventSourceType}; -use crate::read_env; /// Build an AWS collector struct. #[derive(Default, Debug)] @@ -59,11 +58,11 @@ impl CollecterBuilder { } /// Build a collector using the raw events. - pub async fn build(self, raw_events: FlatS3EventMessages) -> Collecter { + pub async fn build(self, raw_events: FlatS3EventMessages, config: &Config) -> Collecter<'_> { if let Some(s3_client) = self.s3_client { - Collecter::new(s3_client, raw_events) + Collecter::new(s3_client, raw_events, config) } else { - Collecter::new(S3Client::with_defaults().await, raw_events) + Collecter::new(S3Client::with_defaults().await, raw_events, config) } } @@ -96,20 +95,21 @@ impl CollecterBuilder { } /// Build a collector by manually calling receive to obtain the raw events. - pub async fn build_receive(mut self) -> Result { + pub async fn build_receive(mut self, config: &Config) -> Result { let url = self.sqs_url.take(); - let url = if let Some(url) = url { - url - } else { - read_env("SQS_QUEUE_URL")? - }; + let url = Config::value_or_else(url.as_deref(), config.sqs_url())?; let client = self.sqs_client.take(); if let Some(sqs_client) = &client { - Ok(self.build(Self::receive(sqs_client, &url).await?).await) + Ok(self + .build(Self::receive(sqs_client, url).await?, config) + .await) } else { Ok(self - .build(Self::receive(&SQSClient::with_defaults().await, &url).await?) + .build( + Self::receive(&SQSClient::with_defaults().await, url).await?, + config, + ) .await) } } @@ -117,20 +117,25 @@ impl CollecterBuilder { /// Collect raw events into the processed form which the database module accepts. #[derive(Debug)] -pub struct Collecter { +pub struct Collecter<'a> { client: Client, raw_events: FlatS3EventMessages, + config: &'a Config, } -impl Collecter { +impl<'a> Collecter<'a> { /// Create a new collector. - pub(crate) fn new(client: Client, raw_events: FlatS3EventMessages) -> Self { - Self { client, raw_events } + pub(crate) fn new(client: Client, raw_events: FlatS3EventMessages, config: &'a Config) -> Self { + Self { + client, + raw_events, + config, + } } /// Get the inner values. - pub fn into_inner(self) -> (Client, FlatS3EventMessages) { - (self.client, self.raw_events) + pub fn into_inner(self) -> (Client, FlatS3EventMessages, &'a Config) { + (self.client, self.raw_events, self.config) } /// Converts an AWS datetime to a standard database format. @@ -204,35 +209,19 @@ impl Collecter { .collect::>>()?, )) } - - /// Read the whether paired ingest mode should be used. - pub fn paired_ingest_mode() -> Result { - Ok(read_env("PAIRED_INGEST_MODE") - .ok() - .map(|value| { - if value == "1" { - Ok(true) - } else { - value.parse::() - } - }) - .transpose() - .map_err(|_| InvalidEnvironmentVariable("ingest paired mode".to_string()))? - .unwrap_or_default()) - } } #[async_trait] -impl Collect for Collecter { +impl<'a> Collect for Collecter<'a> { async fn collect(self) -> Result { - let (client, events) = self.into_inner(); + let (client, events, config) = self.into_inner(); let events = events.sort_and_dedup(); let events = Self::update_events(&client, events).await?; // Get only the known event types. let events = events.filter_known(); - if Self::paired_ingest_mode()? { + if config.paired_ingest_mode() { Ok(EventSourceType::S3Paired(events.into())) } else { Ok(EventSourceType::S3(TransposedS3EventMessages::from(events))) @@ -297,7 +286,7 @@ pub(crate) mod tests { .with_sqs_client(sqs_client) .with_s3_client(s3_client) .with_sqs_url("url") - .build_receive() + .build_receive(&Default::default()) .await .unwrap() .collect() @@ -319,7 +308,8 @@ pub(crate) mod tests { #[tokio::test] async fn head() { - let mut collecter = test_collecter().await; + let config = Default::default(); + let mut collecter = test_collecter(&config).await; set_s3_client_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); @@ -329,7 +319,8 @@ pub(crate) mod tests { #[tokio::test] async fn head_not_found() { - let mut collecter = test_collecter().await; + let config = Default::default(); + let mut collecter = test_collecter(&config).await; set_s3_client_expectations( &mut collecter.client, @@ -342,7 +333,8 @@ pub(crate) mod tests { #[tokio::test] async fn update_events() { - let mut collecter = test_collecter().await; + let config = Default::default(); + let mut collecter = test_collecter(&config).await; let events = expected_flat_events_simple().sort_and_dedup(); @@ -365,7 +357,8 @@ pub(crate) mod tests { #[tokio::test] async fn collect() { - let mut collecter = test_collecter().await; + let config = Default::default(); + let mut collecter = test_collecter(&config).await; set_s3_client_expectations(&mut collecter.client, vec![|| Ok(expected_head_object())]); @@ -429,8 +422,8 @@ pub(crate) mod tests { ) } - async fn test_collecter() -> Collecter { - Collecter::new(Client::default(), expected_flat_events_simple()) + async fn test_collecter(config: &Config) -> Collecter<'_> { + Collecter::new(Client::default(), expected_flat_events_simple(), config) } fn expected_receive_message() -> ReceiveMessageOutput { diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs index 612f83aba..84e8ed7f5 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/events/aws/mod.rs @@ -513,7 +513,7 @@ impl FlatS3EventMessage { self } - /// Set the object id. + /// Set the object group id. pub fn with_object_id(mut self, object_id: Uuid) -> Self { self.object_id = object_id; self diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs index 7fe04371a..f2e9cab9d 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/handlers/aws.rs @@ -7,7 +7,7 @@ use aws_lambda_events::sqs::SqsEvent; use itertools::Itertools; use lambda_runtime::Error; use mockall_double::double; -use sqlx::PgPool; +use sea_orm::DatabaseConnection; use tracing::{debug, trace}; #[double] @@ -17,7 +17,8 @@ use crate::clients::aws::sqs::Client as SQSClient; use crate::database::aws::credentials::IamGeneratorBuilder; use crate::database::aws::query::Query; use crate::database::{Client, Ingest}; -use crate::events::aws::collecter::{Collecter, CollecterBuilder}; +use crate::env::Config as EnvConfig; +use crate::events::aws::collecter::CollecterBuilder; use crate::events::aws::inventory::{DiffMessages, Inventory, Manifest}; use crate::events::aws::message::EventType::Created; use crate::events::aws::{FlatS3EventMessages, TransposedS3EventMessages}; @@ -29,13 +30,14 @@ pub async fn receive_and_ingest( s3_client: S3Client, sqs_client: SQSClient, sqs_url: Option>, - database_client: Client<'_>, + database_client: Client, + env_config: &EnvConfig, ) -> Result { let events = CollecterBuilder::default() .with_s3_client(s3_client) .with_sqs_client(sqs_client) .set_sqs_url(sqs_url) - .build_receive() + .build_receive(env_config) .await? .collect() .await?; @@ -48,7 +50,8 @@ pub async fn receive_and_ingest( pub async fn ingest_event( event: SqsEvent, s3_client: S3Client, - database_client: Client<'_>, + database_client: Client, + env_config: &EnvConfig, ) -> Result { trace!("received event: {:?}", event); @@ -68,7 +71,7 @@ pub async fn ingest_event( let events = CollecterBuilder::default() .with_s3_client(s3_client) - .build(events) + .build(events, env_config) .await .collect() .await?; @@ -82,12 +85,13 @@ pub async fn ingest_event( /// Handle an S3 inventory for ingestion. pub async fn ingest_s3_inventory( s3_client: S3Client, - database_client: Client<'_>, + database_client: Client, bucket: Option, key: Option, manifest: Option, + env_config: &EnvConfig, ) -> Result { - if Collecter::paired_ingest_mode()? { + if env_config.paired_ingest_mode() { return Err(Error::from( "paired ingest mode is not supported for S3 inventory".to_string(), )); @@ -164,16 +168,29 @@ pub async fn ingest_s3_inventory( } /// Create a postgres database pool using an IAM credential generator. -pub async fn create_database_pool() -> Result { - Ok(Client::create_pool(Some(IamGeneratorBuilder::default().build().await?)).await?) +pub async fn create_database_pool(env_config: &EnvConfig) -> Result { + Ok(Client::create_pool( + Some(IamGeneratorBuilder::default().build(env_config).await?), + env_config, + ) + .await?) } /// Update connection options with new credentials. /// Todo, replace this with sqlx `before_connect` once it is implemented. -pub async fn update_credentials(pool: &PgPool) -> Result<(), Error> { - pool.set_connect_options( - Client::connect_options(Some(IamGeneratorBuilder::default().build().await?)).await?, - ); +pub async fn update_credentials( + connection: &DatabaseConnection, + env_config: &EnvConfig, +) -> Result<(), Error> { + connection + .get_postgres_connection_pool() + .set_connect_options( + Client::pg_connect_options( + Some(IamGeneratorBuilder::default().build(env_config).await?), + env_config, + ) + .await?, + ); Ok(()) } @@ -205,6 +222,7 @@ mod tests { }; use crate::events::aws::FlatS3EventMessage; use crate::events::EventSourceType::S3; + use sqlx::PgPool; use super::*; @@ -216,9 +234,15 @@ mod tests { set_sqs_client_expectations(&mut sqs_client); set_s3_client_expectations(&mut s3_client, vec![|| Ok(expected_head_object())]); - let ingester = receive_and_ingest(s3_client, sqs_client, Some("url"), Client::new(pool)) - .await - .unwrap(); + let ingester = receive_and_ingest( + s3_client, + sqs_client, + Some("url"), + Client::from_pool(pool), + &Default::default(), + ) + .await + .unwrap(); let (object_results, s3_object_results) = fetch_results(&ingester).await; @@ -256,9 +280,14 @@ mod tests { }], }; - let ingester = ingest_event(event, s3_client, Client::new(pool)) - .await - .unwrap(); + let ingester = ingest_event( + event, + s3_client, + Client::from_pool(pool), + &Default::default(), + ) + .await + .unwrap(); let (object_results, s3_object_results) = fetch_results(&ingester).await; @@ -294,10 +323,11 @@ mod tests { let ingester = ingest_s3_inventory( client, - Client::new(pool.clone()), + Client::from_pool(pool.clone()), Some(MANIFEST_BUCKET.to_string()), Some("manifest.json".to_string()), None, + &Default::default(), ) .await .unwrap(); @@ -365,10 +395,11 @@ mod tests { ingest_s3_inventory( client, - Client::new(pool.clone()), + Client::from_pool(pool.clone()), Some(MANIFEST_BUCKET.to_string()), Some("manifest.json".to_string()), None, + &Default::default(), ) .await .unwrap(); @@ -429,10 +460,11 @@ mod tests { ingest_s3_inventory( client, - Client::new(pool.clone()), + Client::from_pool(pool.clone()), Some(MANIFEST_BUCKET.to_string()), Some("manifest.json".to_string()), None, + &Default::default(), ) .await .unwrap(); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs index 423cffb48..1eee5ba5e 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/lib.rs @@ -2,17 +2,12 @@ //! logic. //! -use crate::error::Error::MissingEnvironmentVariable; -use crate::error::Result; - pub mod clients; pub mod database; +pub mod env; pub mod error; pub mod events; pub mod handlers; +pub mod queries; +pub mod routes; pub mod uuid; - -/// Read an environment variable into a string. -pub fn read_env>(key: K) -> Result { - std::env::var(key.as_ref()).map_err(|err| MissingEnvironmentVariable(err.to_string())) -} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs new file mode 100644 index 000000000..fd397d5b0 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/get.rs @@ -0,0 +1,86 @@ +//! Query builder involving get operations on the database. +//! + +use sea_orm::{EntityTrait, Select}; +use uuid::Uuid; + +use crate::database::entities::object::Entity as ObjectEntity; +use crate::database::entities::object::Model as Object; +use crate::database::entities::s3_object::Entity as S3ObjectEntity; +use crate::database::entities::s3_object::Model as S3Object; +use crate::database::Client; +use crate::error::Result; + +/// A query builder for get operations. +pub struct GetQueryBuilder<'a> { + client: &'a Client, +} + +impl<'a> GetQueryBuilder<'a> { + /// Create a new query builder. + pub fn new(client: &'a Client) -> Self { + Self { client } + } + + /// Build a select query for finding an object by id. + pub fn build_object_by_id(id: Uuid) -> Select { + ObjectEntity::find_by_id(id) + } + + /// Build a select query for finding an s3 object by id. + pub fn build_s3_object_by_id(id: Uuid) -> Select { + S3ObjectEntity::find_by_id(id) + } + + /// Get a specific object by id. + pub async fn get_object(&self, id: Uuid) -> Result> { + Ok(Self::build_object_by_id(id) + .one(self.client.connection_ref()) + .await?) + } + + /// Get a specific s3 object by id. + pub async fn get_s3_object_by_id(&self, id: Uuid) -> Result> { + Ok(Self::build_s3_object_by_id(id) + .one(self.client.connection_ref()) + .await?) + } +} + +#[cfg(test)] +mod tests { + use sqlx::PgPool; + + use crate::database::aws::migration::tests::MIGRATOR; + use crate::database::Client; + use crate::queries::tests::initialize_database; + + use super::*; + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_get_object(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let first = entries.first().unwrap(); + let builder = GetQueryBuilder::new(&client); + let result = builder.get_object(first.0.object_id).await.unwrap(); + + assert_eq!(result.as_ref(), Some(&first.0)); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_list_s3_objects(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let first = entries.first().unwrap(); + let builder = GetQueryBuilder::new(&client); + let result = builder + .get_s3_object_by_id(first.1.s3_object_id) + .await + .unwrap(); + + assert_eq!(result.as_ref(), Some(&first.1)); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs new file mode 100644 index 000000000..c6a28ffa7 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs @@ -0,0 +1,128 @@ +//! Query builder involving list operations on the database. +//! + +use sea_orm::{EntityTrait, PaginatorTrait, Select}; + +use crate::database::entities::object::Entity as ObjectEntity; +use crate::database::entities::object::Model as Object; +use crate::database::entities::s3_object::Entity as S3ObjectEntity; +use crate::database::entities::s3_object::Model as S3Object; +use crate::database::Client; +use crate::error::Result; + +/// A query builder for list operations. +pub struct ListQueryBuilder<'a> { + client: &'a Client, +} + +impl<'a> ListQueryBuilder<'a> { + /// Create a new query builder. + pub fn new(client: &'a Client) -> Self { + Self { client } + } + + /// Build a select query for finding values from objects. + pub fn build_object() -> Select { + ObjectEntity::find() + } + + /// Build a select query for finding values from s3 objects. + pub fn build_s3_object() -> Select { + S3ObjectEntity::find() + } + + /// Find all objects. + pub async fn list_objects(&self) -> Result> { + Ok(Self::build_object() + .all(self.client.connection_ref()) + .await?) + } + + /// Find all s3 objects. + pub async fn list_s3_objects(&self) -> Result> { + Ok(Self::build_s3_object() + .all(self.client.connection_ref()) + .await?) + } + + /// Count objects. + pub async fn count_objects(&self) -> Result { + Ok(Self::build_object() + .count(self.client.connection_ref()) + .await?) + } + + /// Count s3 objects. + pub async fn count_s3_objects(&self) -> Result { + Ok(Self::build_s3_object() + .count(self.client.connection_ref()) + .await?) + } +} + +#[cfg(test)] +mod tests { + use sqlx::PgPool; + + use crate::database::aws::migration::tests::MIGRATOR; + use crate::database::Client; + use crate::queries::tests::initialize_database; + + use super::*; + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_list_objects(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let builder = ListQueryBuilder::new(&client); + let result = builder.list_objects().await.unwrap(); + + assert_eq!( + result, + entries + .into_iter() + .map(|(entry, _)| entry) + .collect::>() + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_list_s3_objects(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let builder = ListQueryBuilder::new(&client); + let result = builder.list_s3_objects().await.unwrap(); + + assert_eq!( + result, + entries + .into_iter() + .map(|(_, entry)| entry) + .collect::>() + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_count_objects(pool: PgPool) { + let client = Client::from_pool(pool); + initialize_database(&client, 10).await; + + let builder = ListQueryBuilder::new(&client); + let result = builder.count_objects().await.unwrap(); + + assert_eq!(result, 10); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn test_count_s3_objects(pool: PgPool) { + let client = Client::from_pool(pool); + initialize_database(&client, 10).await; + + let builder = ListQueryBuilder::new(&client); + let result = builder.count_s3_objects().await.unwrap(); + + assert_eq!(result, 10); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs new file mode 100644 index 000000000..05fbe44a0 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs @@ -0,0 +1,102 @@ +//! This module handles all logic related to querying the file manager through APIs/events. +//! + +pub mod get; +pub mod list; + +#[cfg(test)] +pub(crate) mod tests { + use std::ops::Add; + + use chrono::{DateTime, Days}; + use sea_orm::Set; + use sea_orm::{ActiveModelTrait, TryIntoModel}; + use strum::EnumCount; + + use crate::database::entities::object::ActiveModel as ActiveObject; + use crate::database::entities::object::Model as Object; + use crate::database::entities::s3_object::ActiveModel as ActiveS3Object; + use crate::database::entities::s3_object::Model as S3Object; + use crate::database::entities::sea_orm_active_enums::{EventType, StorageClass}; + use crate::database::Client; + use crate::uuid::UuidGenerator; + + /// Initialize database state for testing. + pub(crate) async fn initialize_database(client: &Client, n: usize) -> Vec<(Object, S3Object)> { + let mut output = vec![]; + + for index in 0..n { + let (object, s3_object) = generate_entry(index); + + object + .clone() + .insert(client.connection_ref()) + .await + .unwrap(); + s3_object + .clone() + .insert(client.connection_ref()) + .await + .unwrap(); + + output.push(( + object.try_into_model().unwrap(), + s3_object.try_into_model().unwrap(), + )); + } + + output + } + + pub(crate) fn generate_entry(index: usize) -> (ActiveObject, ActiveS3Object) { + let object_id = UuidGenerator::generate(); + let event = event_type(index); + let date = || Set(Some(DateTime::default().add(Days::new(index as u64)))); + + ( + ActiveObject { + object_id: Set(object_id), + attributes: Set(None), + }, + ActiveS3Object { + s3_object_id: Set(UuidGenerator::generate()), + object_id: Set(object_id), + public_id: Set(UuidGenerator::generate()), + event_type: Set(event.clone()), + // Half as many buckets as keys. + bucket: Set((index / 2).to_string()), + key: Set(index.to_string()), + version_id: Set(index.to_string()), + date: date(), + size: Set(Some(index as i64)), + sha256: Set(Some(index.to_string())), + last_modified_date: date(), + e_tag: Set(Some(index.to_string())), + storage_class: Set(Some(storage_class(index))), + sequencer: Set(Some(index.to_string())), + is_delete_marker: Set(false), + number_duplicate_events: Set(0), + attributes: Set(None), + deleted_date: if event == EventType::Deleted { + date() + } else { + Set(None) + }, + deleted_sequencer: if event == EventType::Deleted { + Set(Some(index.to_string())) + } else { + Set(None) + }, + number_reordered: Set(0), + }, + ) + } + + pub(crate) fn event_type(index: usize) -> EventType { + EventType::from_repr((index % (EventType::COUNT - 1)) as u8).unwrap() + } + + pub(crate) fn storage_class(index: usize) -> StorageClass { + StorageClass::from_repr((index % StorageClass::COUNT) as u8).unwrap() + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs new file mode 100644 index 000000000..ecc0b7bab --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/get.rs @@ -0,0 +1,115 @@ +//! Route logic for get API calls. +//! + +use axum::extract::{Path, State}; +use axum::Json; +use serde::Deserialize; +use uuid::Uuid; + +use crate::database::entities::object::Model as Object; +use crate::database::entities::s3_object::Model as S3Object; +use crate::error::Result; +use crate::queries::get::GetQueryBuilder; +use crate::routes::AppState; + +/// Params for a get object by id request. +#[derive(Debug, Deserialize)] +pub struct GetObjectById {} + +/// The get object handler. +pub async fn get_object_by_id( + state: State, + Path(id): Path, +) -> Result>> { + let query = GetQueryBuilder::new(&state.client); + + Ok(Json(query.get_object(id).await?)) +} + +/// Params for a get s3 objects by id request. +#[derive(Debug, Deserialize)] +pub struct GetS3ObjectById {} + +/// The get s3 objects handler. +pub async fn get_s3_object_by_id( + state: State, + Path(id): Path, +) -> Result>> { + let query = GetQueryBuilder::new(&state.client); + + Ok(Json(query.get_s3_object_by_id(id).await?)) +} + +#[cfg(test)] +mod tests { + use axum::body::to_bytes; + use axum::body::Body; + use axum::http::Request; + use parquet::data_type::AsBytes; + use serde_json::from_slice; + use sqlx::PgPool; + use tower::ServiceExt; + + use crate::database::aws::migration::tests::MIGRATOR; + use crate::database::entities::object::Model as Object; + use crate::database::entities::s3_object::Model as S3Object; + use crate::database::Client; + use crate::queries::tests::initialize_database; + use crate::routes::query_router; + + #[sqlx::test(migrator = "MIGRATOR")] + async fn get_objects_api(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let first = entries.first().unwrap(); + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri(format!("/objects/{}", first.0.object_id)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!(result, first.0); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn get_s3_objects_api(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let first = entries.first().unwrap(); + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri(format!("/s3_objects/{}", first.1.s3_object_id)) + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!(result, first.1); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs new file mode 100644 index 000000000..220bc8e74 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs @@ -0,0 +1,186 @@ +//! Route logic for list API calls. +//! + +use axum::extract::State; +use axum::Json; +use serde::Deserialize; + +use crate::database::entities::object::Model as Object; +use crate::database::entities::s3_object::Model as S3Object; +use crate::error::Result; +use crate::queries::list::ListQueryBuilder; +use crate::routes::AppState; + +/// Params for a list objects request. +#[derive(Debug, Deserialize)] +pub struct ListObjectsParams {} + +/// The list objects handler. +pub async fn list_objects(state: State) -> Result>> { + let query = ListQueryBuilder::new(&state.client); + + Ok(Json(query.list_objects().await?)) +} + +/// The count objects handler. +pub async fn count_objects(state: State) -> Result> { + let query = ListQueryBuilder::new(&state.client); + + Ok(Json(query.count_objects().await?)) +} + +/// Params for a list s3 objects request. +#[derive(Debug, Deserialize)] +pub struct ListS3ObjectsParams {} + +/// The list s3 objects handler. +pub async fn list_s3_objects(state: State) -> Result>> { + let query = ListQueryBuilder::new(&state.client); + + Ok(Json(query.list_s3_objects().await?)) +} + +/// The count s3 objects handler. +pub async fn count_s3_objects(state: State) -> Result> { + let query = ListQueryBuilder::new(&state.client); + + Ok(Json(query.count_s3_objects().await?)) +} + +#[cfg(test)] +mod tests { + use axum::body::to_bytes; + use axum::body::Body; + use axum::http::Request; + use parquet::data_type::AsBytes; + use serde_json::from_slice; + use sqlx::PgPool; + use tower::ServiceExt; + + use crate::database::aws::migration::tests::MIGRATOR; + use crate::database::entities::object::Model as Object; + use crate::database::entities::s3_object::Model as S3Object; + use crate::database::Client; + use crate::queries::tests::initialize_database; + use crate::routes::query_router; + + #[sqlx::test(migrator = "MIGRATOR")] + async fn list_objects_api(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri("/objects") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::>( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!( + result, + entries + .into_iter() + .map(|(entry, _)| entry) + .collect::>() + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn list_s3_objects_api(pool: PgPool) { + let client = Client::from_pool(pool); + let entries = initialize_database(&client, 10).await; + + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri("/s3_objects") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::>( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!( + result, + entries + .into_iter() + .map(|(_, entry)| entry) + .collect::>() + ); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn count_objects_api(pool: PgPool) { + let client = Client::from_pool(pool); + initialize_database(&client, 10).await; + + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri("/objects/count") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!(result, 10); + } + + #[sqlx::test(migrator = "MIGRATOR")] + async fn count_s3_objects_api(pool: PgPool) { + let client = Client::from_pool(pool); + initialize_database(&client, 10).await; + + let app = query_router(client); + let response = app + .oneshot( + Request::builder() + .uri("/s3_objects/count") + .body(Body::empty()) + .unwrap(), + ) + .await + .unwrap(); + + let result = from_slice::( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes(), + ) + .unwrap(); + + assert_eq!(result, 10); + } +} diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs new file mode 100644 index 000000000..66df95d80 --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/mod.rs @@ -0,0 +1,89 @@ +//! This module handles API routing. +//! + +use axum::http::StatusCode; +use axum::response::{IntoResponse, Response}; +use axum::routing::get; +use axum::{Json, Router}; +use serde::Serialize; + +use crate::database::Client; +use crate::error::Error; +use crate::routes::get::{get_object_by_id, get_s3_object_by_id}; +use crate::routes::list::{count_objects, count_s3_objects, list_objects, list_s3_objects}; + +pub mod get; +pub mod list; + +#[derive(Debug, Clone)] +pub struct AppState { + client: Client, +} + +/// The main filemanager router for query read-only requests. +pub fn query_router(database: Client) -> Router { + let state = AppState { client: database }; + + Router::new() + .route("/objects", get(list_objects)) + .route("/objects/:id", get(get_object_by_id)) + .route("/objects/count", get(count_objects)) + .route("/s3_objects", get(list_s3_objects)) + .route("/s3_objects/:id", get(get_s3_object_by_id)) + .route("/s3_objects/count", get(count_s3_objects)) + .with_state(state) +} + +/// The error response format returned in the API. +#[derive(Serialize)] +pub struct ErrorResponse { + message: String, +} + +impl IntoResponse for Error { + fn into_response(self) -> Response { + let (status, message) = match self { + Error::SQLError(err) => (StatusCode::NOT_FOUND, err.to_string()), + _ => ( + StatusCode::INTERNAL_SERVER_ERROR, + "unexpected error".to_string(), + ), + }; + + (status, Json(ErrorResponse { message })).into_response() + } +} + +#[cfg(test)] +mod tests { + use axum::body::to_bytes; + use axum::http::StatusCode; + use axum::response::IntoResponse; + use parquet::data_type::AsBytes; + use serde_json::{from_slice, json, Value}; + + use crate::error::Error; + + #[tokio::test] + async fn sql_error_into_response() { + let response = Error::SQLError("error".to_string()).into_response(); + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + assert_eq!( + json!({"message": "error"}), + from_slice::( + to_bytes(response.into_body(), usize::MAX) + .await + .unwrap() + .as_bytes() + ) + .unwrap() + ); + } + + #[tokio::test] + async fn internal_error_into_response() { + let response = Error::MigrateError("error".to_string()).into_response(); + assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); + } +}