From 908270675884a9374c2e2be60064cae798a903a9 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 8 Jul 2024 09:34:49 +1000 Subject: [PATCH 1/2] fix(filemanager): order list operations by sequencer by default --- .../stateless/stacks/filemanager/Cargo.lock | 1 + .../stacks/filemanager/filemanager/Cargo.toml | 1 + .../filemanager/src/queries/list.rs | 11 ++++--- .../filemanager/src/queries/mod.rs | 32 +++++++++++++++++-- .../filemanager/src/routes/list.rs | 8 ++--- 5 files changed, 42 insertions(+), 11 deletions(-) diff --git a/lib/workload/stateless/stacks/filemanager/Cargo.lock b/lib/workload/stateless/stacks/filemanager/Cargo.lock index 676148d23..42bef5cdb 100644 --- a/lib/workload/stateless/stacks/filemanager/Cargo.lock +++ b/lib/workload/stateless/stacks/filemanager/Cargo.lock @@ -2145,6 +2145,7 @@ dependencies = [ "mockall_double", "orc-rust", "parquet", + "rand", "sea-orm", "serde", "serde_json", diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml index 37e50663b..cac9ea573 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml +++ b/lib/workload/stateless/stacks/filemanager/filemanager/Cargo.toml @@ -69,6 +69,7 @@ aws_lambda_events = "0.15" [dev-dependencies] lazy_static = "1" +rand = "0.8" aws-smithy-runtime-api = "1" aws-smithy-mocks-experimental = "0.2" diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs index 0d02d704f..b273b0331 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/list.rs @@ -1,9 +1,10 @@ //! Query builder involving list operations on the database. //! -use sea_orm::{EntityTrait, FromQueryResult, PaginatorTrait, QuerySelect, Select}; +use sea_orm::{EntityTrait, FromQueryResult, PaginatorTrait, QueryOrder, QuerySelect, Select}; use crate::database::entities::object::Entity as ObjectEntity; +use crate::database::entities::s3_object::Column as S3Column; use crate::database::entities::s3_object::Entity as S3ObjectEntity; use crate::database::Client; use crate::error::Error::OverflowError; @@ -46,7 +47,7 @@ impl<'a> ListQueryBuilder<'a, S3ObjectEntity> { /// Build a select query for finding values from s3 objects. pub fn build_object() -> Select { - S3ObjectEntity::find() + S3ObjectEntity::find().order_by_asc(S3Column::Sequencer) } } @@ -117,7 +118,7 @@ mod tests { use crate::database::aws::migration::tests::MIGRATOR; use crate::database::Client; - use crate::queries::tests::initialize_database; + use crate::queries::tests::{initialize_database, initialize_database_reorder}; use super::*; @@ -175,7 +176,7 @@ mod tests { #[sqlx::test(migrator = "MIGRATOR")] async fn test_list_s3_objects(pool: PgPool) { let client = Client::from_pool(pool); - let entries = initialize_database(&client, 10).await; + let entries = initialize_database_reorder(&client, 10).await; let builder = ListQueryBuilder::::new(&client); let result = builder.all().await.unwrap(); @@ -192,7 +193,7 @@ mod tests { #[sqlx::test(migrator = "MIGRATOR")] async fn test_paginate_s3_objects(pool: PgPool) { let client = Client::from_pool(pool); - let entries = initialize_database(&client, 10).await; + let entries = initialize_database_reorder(&client, 10).await; let builder = ListQueryBuilder::::new(&client); diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs index 05fbe44a0..0899c8a06 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/queries/mod.rs @@ -9,6 +9,8 @@ pub(crate) mod tests { use std::ops::Add; use chrono::{DateTime, Days}; + use rand::seq::SliceRandom; + use rand::thread_rng; use sea_orm::Set; use sea_orm::{ActiveModelTrait, TryIntoModel}; use strum::EnumCount; @@ -21,13 +23,39 @@ pub(crate) mod tests { use crate::database::Client; use crate::uuid::UuidGenerator; + /// Initialize the database state for testing and shuffle entries to simulate + /// out of order events. + pub(crate) async fn initialize_database_reorder( + client: &Client, + n: usize, + ) -> Vec<(Object, S3Object)> { + let mut data = initialize_database_with_shuffle(client, n, true).await; + + // Return the correct ordering for test purposes + data.sort_by(|(_, a), (_, b)| a.sequencer.cmp(&b.sequencer)); + + data + } + /// Initialize database state for testing. pub(crate) async fn initialize_database(client: &Client, n: usize) -> Vec<(Object, S3Object)> { + initialize_database_with_shuffle(client, n, false).await + } + + async fn initialize_database_with_shuffle( + client: &Client, + n: usize, + shuffle: bool, + ) -> Vec<(Object, S3Object)> { let mut output = vec![]; - for index in 0..n { - let (object, s3_object) = generate_entry(index); + let mut entries: Vec<_> = (0..n).map(generate_entry).collect(); + + if shuffle { + entries.shuffle(&mut thread_rng()); + } + for (object, s3_object) in entries { object .clone() .insert(client.connection_ref()) diff --git a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs index 5cc26ddc8..a24e86609 100644 --- a/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs +++ b/lib/workload/stateless/stacks/filemanager/filemanager/src/routes/list.rs @@ -222,7 +222,7 @@ mod tests { use crate::database::aws::migration::tests::MIGRATOR; use crate::database::entities::object::Model as Object; use crate::database::entities::s3_object::Model as S3Object; - use crate::queries::tests::initialize_database; + use crate::queries::tests::{initialize_database, initialize_database_reorder}; use crate::routes::list::{ListCount, ListResponse}; use crate::routes::{api_router, AppState}; @@ -331,7 +331,7 @@ mod tests { #[sqlx::test(migrator = "MIGRATOR")] async fn list_s3_objects_api(pool: PgPool) { let state = AppState::from_pool(pool); - let entries = initialize_database(state.client(), 10).await; + let entries = initialize_database_reorder(state.client(), 10).await; let app = api_router(state); let response = app @@ -365,7 +365,7 @@ mod tests { #[sqlx::test(migrator = "MIGRATOR")] async fn list_s3_objects_api_paginate(pool: PgPool) { let state = AppState::from_pool(pool); - let entries = initialize_database(state.client(), 10).await; + let entries = initialize_database_reorder(state.client(), 10).await; let app = api_router(state); let response = app @@ -399,7 +399,7 @@ mod tests { #[sqlx::test(migrator = "MIGRATOR")] async fn list_s3_objects_api_zero_page_size(pool: PgPool) { let state = AppState::from_pool(pool); - let entries = initialize_database(state.client(), 10).await; + let entries = initialize_database_reorder(state.client(), 10).await; let app = api_router(state); let response = app From f70d306aeba6d2e03b9cdf55f7e83c382e6b5865 Mon Sep 17 00:00:00 2001 From: Marko Malenic Date: Mon, 8 Jul 2024 12:58:50 +1000 Subject: [PATCH 2/2] feat(filemanager): add sequencer index --- .../database/migrations/0003_add_sequencer_index.sql | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 lib/workload/stateless/stacks/filemanager/database/migrations/0003_add_sequencer_index.sql diff --git a/lib/workload/stateless/stacks/filemanager/database/migrations/0003_add_sequencer_index.sql b/lib/workload/stateless/stacks/filemanager/database/migrations/0003_add_sequencer_index.sql new file mode 100644 index 000000000..195f7e9bb --- /dev/null +++ b/lib/workload/stateless/stacks/filemanager/database/migrations/0003_add_sequencer_index.sql @@ -0,0 +1,4 @@ +-- Create an index for ordering `s3_object`s in ascending order. +-- TODO this should be created `concurrently` when running migrations without transactions is released: +-- https://github.com/launchbadge/sqlx/issues/767 +create index sequencer_index on s3_object (sequencer);