diff --git a/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs b/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs index 8b7b1a0e9..06ccf71d4 100644 --- a/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs +++ b/lib/workload/stateless/filemanager/filemanager/src/database/aws/ingester.rs @@ -1195,171 +1195,157 @@ pub(crate) mod tests { // 720 permutations run_permutation_test(&pool, event_permutations, 5, |s3_object_results| { - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") - == FlatS3EventMessage::default_version_id() - && object.get::, _>("created_sequencer") - == Some("1".to_string()) - && object.get::, _>("deleted_sequencer") - == Some("2".to_string()) - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") - == FlatS3EventMessage::default_version_id() - && object - .get::, _>("created_sequencer") - .is_none() - && object.get::, _>("deleted_sequencer") - == Some("3".to_string()) - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") - == FlatS3EventMessage::default_version_id() - && object.get::, _>("created_sequencer") - == Some("4".to_string()) - && object - .get::, _>("deleted_sequencer") - .is_none() - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") - == FlatS3EventMessage::default_version_id() - && object.get::, _>("created_sequencer") - == Some("5".to_string()) - && object - .get::, _>("deleted_sequencer") - .is_none() - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key1" - && object.get::("bucket") == "bucket" - && object.get::("version_id") - == FlatS3EventMessage::default_version_id() - && object.get::, _>("created_sequencer") - == Some("1".to_string()) - && object - .get::, _>("deleted_sequencer") - .is_none() - }) - .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + &FlatS3EventMessage::default_version_id(), + Some("1"), + Some("2"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + &FlatS3EventMessage::default_version_id(), + None, + Some("3"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + &FlatS3EventMessage::default_version_id(), + Some("4"), + None, + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + &FlatS3EventMessage::default_version_id(), + Some("5"), + None, + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key1", + "bucket", + &FlatS3EventMessage::default_version_id(), + Some("1"), + None, + ) + .unwrap(); }) .await; } #[sqlx::test(migrator = "MIGRATOR")] async fn ingest_permutations_small(pool: PgPool) { - let event_permutations = vec![ - FlatS3EventMessage::new_with_generated_id() - .with_bucket("bucket".to_string()) - .with_key("key".to_string()) - .with_version_id("version_id".to_string()) - .with_event_type(Created) - .with_sequencer(Some("1".to_string())), - FlatS3EventMessage::new_with_generated_id() - .with_bucket("bucket".to_string()) - .with_key("key".to_string()) - .with_version_id("version_id".to_string()) - .with_event_type(Deleted) - .with_sequencer(Some("2".to_string())), - FlatS3EventMessage::new_with_generated_id() - .with_bucket("bucket".to_string()) - .with_key("key".to_string()) - .with_version_id("version_id".to_string()) - .with_event_type(Created) - .with_sequencer(Some("3".to_string())), - // Duplicate + let event_permutations = example_event_permutations(); + + // 720 permutations + run_permutation_test(&pool, event_permutations, 3, |s3_object_results| { + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id", + Some("1"), + Some("2"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id", + Some("3"), + Some("4"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id1", + None, + Some("1"), + ) + .unwrap(); + }) + .await; + } + + #[ignore] + #[sqlx::test(migrator = "MIGRATOR")] + async fn ingest_permutations(pool: PgPool) { + // This primarily tests out of order and duplicate event ingestion, however it could also function + // as a performance test. + let mut event_permutations = example_event_permutations(); + event_permutations.extend(vec![ FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) .with_version_id("version_id".to_string()) .with_event_type(Created) - .with_sequencer(Some("3".to_string())), + .with_sequencer(Some("5".to_string())), FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) .with_version_id("version_id".to_string()) .with_event_type(Deleted) - .with_sequencer(Some("4".to_string())), - // Different version id - FlatS3EventMessage::new_with_generated_id() - .with_bucket("bucket".to_string()) - .with_key("key".to_string()) - .with_version_id("version_id1".to_string()) - .with_event_type(Deleted) - .with_sequencer(Some("1".to_string())), - ]; + .with_sequencer(Some("6".to_string())), + ]); - // 720 permutations - run_permutation_test(&pool, event_permutations, 3, |s3_object_results| { - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") == *"version_id" - && object.get::, _>("created_sequencer") - == Some("1".to_string()) - && object.get::, _>("deleted_sequencer") - == Some("2".to_string()) - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") == *"version_id" - && object.get::, _>("created_sequencer") - == Some("3".to_string()) - && object.get::, _>("deleted_sequencer") - == Some("4".to_string()) - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") == *"version_id1" - && object - .get::, _>("created_sequencer") - .is_none() - && object.get::, _>("deleted_sequencer") - == Some("1".to_string()) - }) - .unwrap(); + // 40320 permutations + run_permutation_test(&pool, event_permutations, 4, |s3_object_results| { + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id", + Some("1"), + Some("2"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id", + Some("3"), + Some("4"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id", + Some("5"), + Some("6"), + ) + .unwrap(); + find_object_with( + &s3_object_results, + "key", + "bucket", + "version_id1", + None, + Some("1"), + ) + .unwrap(); }) .await; } - #[ignore] - #[sqlx::test(migrator = "MIGRATOR")] - async fn ingest_permutations(pool: PgPool) { - // This primarily tests out of order and duplicate event ingestion, however it could also function - // as a performance test. - let event_permutations = vec![ + fn example_event_permutations() -> Vec { + vec![ FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) .with_key("key".to_string()) @@ -1391,18 +1377,6 @@ pub(crate) mod tests { .with_version_id("version_id".to_string()) .with_event_type(Deleted) .with_sequencer(Some("4".to_string())), - FlatS3EventMessage::new_with_generated_id() - .with_bucket("bucket".to_string()) - .with_key("key".to_string()) - .with_version_id("version_id".to_string()) - .with_event_type(Created) - .with_sequencer(Some("5".to_string())), - FlatS3EventMessage::new_with_generated_id() - .with_bucket("bucket".to_string()) - .with_key("key".to_string()) - .with_version_id("version_id".to_string()) - .with_event_type(Deleted) - .with_sequencer(Some("6".to_string())), // Different version id FlatS3EventMessage::new_with_generated_id() .with_bucket("bucket".to_string()) @@ -1410,61 +1384,24 @@ pub(crate) mod tests { .with_version_id("version_id1".to_string()) .with_event_type(Deleted) .with_sequencer(Some("1".to_string())), - ]; + ] + } - // 40320 permutations - run_permutation_test(&pool, event_permutations, 4, |s3_object_results| { - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") == *"version_id" - && object.get::, _>("created_sequencer") - == Some("1".to_string()) - && object.get::, _>("deleted_sequencer") - == Some("2".to_string()) - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") == *"version_id" - && object.get::, _>("created_sequencer") - == Some("3".to_string()) - && object.get::, _>("deleted_sequencer") - == Some("4".to_string()) - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") == *"version_id" - && object.get::, _>("created_sequencer") - == Some("5".to_string()) - && object.get::, _>("deleted_sequencer") - == Some("6".to_string()) - }) - .unwrap(); - s3_object_results - .iter() - .find(|object| { - object.get::("key") == "key" - && object.get::("bucket") == "bucket" - && object.get::("version_id") == *"version_id1" - && object - .get::, _>("created_sequencer") - .is_none() - && object.get::, _>("deleted_sequencer") - == Some("1".to_string()) - }) - .unwrap(); + fn find_object_with<'a>( + results: &'a [PgRow], + key: &str, + bucket: &str, + version_id: &str, + created_sequencer: Option<&str>, + deleted_sequencer: Option<&str>, + ) -> Option<&'a PgRow> { + results.iter().find(|object| { + object.get::("key") == key + && object.get::("bucket") == bucket + && object.get::("version_id") == version_id + && object.get::, _>("created_sequencer") == created_sequencer + && object.get::, _>("deleted_sequencer") == deleted_sequencer }) - .await; } async fn run_permutation_test(