From 0d466bfafdf10f4151f712822c50d1b2dbb8822f Mon Sep 17 00:00:00 2001 From: Ion Koutsouris <15728914+ion-elgreco@users.noreply.github.com> Date: Thu, 9 Jan 2025 19:12:50 +0100 Subject: [PATCH] chore: execute handler tests --- .github/workflows/build.yml | 4 +- crates/lakefs/src/execute.rs | 313 +++++++++++++++++++++++++++++++++++ 2 files changed, 315 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index e04e1e895d..578ae305ea 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -165,8 +165,8 @@ jobs: - name: Download Lakectl run: | - wget -q https://github.com/treeverse/lakeFS/releases/download/v1.48.0/lakeFS_1.48.0_Linux_x86_64.tar.gz - tar -xf lakeFS_1.48.0_Linux_x86_64.tar.gz -C $GITHUB_WORKSPACE + wget -q https://github.com/treeverse/lakeFS/releases/download/v1.48.1/lakeFS_1.48.1_Linux_x86_64.tar.gz + tar -xf lakeFS_1.48.1_Linux_x86_64.tar.gz -C $GITHUB_WORKSPACE echo "$GITHUB_WORKSPACE" >> $GITHUB_PATH - name: Start emulated services diff --git a/crates/lakefs/src/execute.rs b/crates/lakefs/src/execute.rs index bada34ae4c..fa4a077adb 100644 --- a/crates/lakefs/src/execute.rs +++ b/crates/lakefs/src/execute.rs @@ -85,3 +85,316 @@ impl CustomExecuteHandler for LakeFSCustomExecuteHandler { Ok(()) } } + +#[cfg(test)] +mod tests { + use crate::register_handlers; + + use super::*; + use deltalake_core::{logstore::logstore_for, storage::ObjectStoreRegistry}; + use http::StatusCode; + use maplit::hashmap; + use std::{collections::HashMap, sync::OnceLock}; + use tokio::runtime::Runtime; + use url::Url; + use uuid::Uuid; + + fn setup_env(server_url: String) -> LogStoreRef { + register_handlers(None); + let location = Url::parse("lakefs://repo/branch/table").unwrap(); + let raw_options = hashmap! { + "ACCESS_KEY_ID".to_string() => "options_key".to_string(), + "ENDPOINT_URL".to_string() => server_url, + "SECRET_ACCESS_KEY".to_string() => "options_key".to_string(), + "REGION".to_string() => "options_key".to_string() + }; + logstore_for(location, raw_options, None).unwrap() + } + + #[inline] + fn rt() -> &'static Runtime { + static TOKIO_RT: OnceLock = OnceLock::new(); + TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime.")) + } + + #[test] + fn test_pre_execute() { + let handler = LakeFSCustomExecuteHandler {}; + let operation_id = Uuid::new_v4(); + + let mut server = mockito::Server::new(); + let mock = server + .mock("POST", "/api/v1/repositories/repo/branches") + .with_status(StatusCode::CREATED.as_u16().into()) + .with_body("") + .create(); + + let lakefs_store = setup_env(server.url()); + + let result = + rt().block_on(async { handler.pre_execute(&lakefs_store, operation_id).await }); + mock.assert(); + assert!(result.is_ok()); + + if let Some(lakefs_store) = lakefs_store + .clone() + .as_any() + .downcast_ref::() + { + assert!(lakefs_store + .storage + .get_store( + &Url::parse(format!("lakefs://repo/delta-tx-{}/table", operation_id).as_str()) + .unwrap() + ) + .is_ok()); + + assert!(lakefs_store.client.get_transaction(operation_id).is_ok()) + } else { + unreachable!() + } + } + + #[test] + fn test_before_post_commit_hook() { + let handler = LakeFSCustomExecuteHandler {}; + let operation_id = Uuid::new_v4(); + + let mut server = mockito::Server::new(); + let mock = server + .mock("POST", "/api/v1/repositories/repo/branches") + .with_status(StatusCode::CREATED.as_u16().into()) + .with_body("") + .create(); + + let lakefs_store = setup_env(server.url()); + + let result = rt().block_on(async { + handler + .before_post_commit_hook(&lakefs_store, true, operation_id) + .await + }); + mock.assert(); + assert!(result.is_ok()); + + if let Some(lakefs_store) = lakefs_store + .clone() + .as_any() + .downcast_ref::() + { + assert!(lakefs_store + .storage + .get_store( + &Url::parse(format!("lakefs://repo/delta-tx-{}/table", operation_id).as_str()) + .unwrap() + ) + .is_ok()); + + assert!(lakefs_store.client.get_transaction(operation_id).is_ok()) + } else { + unreachable!() + } + } + + #[test] + fn test_post_execute() { + let handler = LakeFSCustomExecuteHandler {}; + let operation_id = Uuid::new_v4(); + + let mut server = mockito::Server::new(); + let mock_1 = server + .mock("POST", "/api/v1/repositories/repo/branches") + .with_status(StatusCode::CREATED.as_u16().into()) + .with_body("") + .create(); + let mock_2 = server + .mock( + "DELETE", + format!( + "/api/v1/repositories/repo/branches/delta-tx-{}", + operation_id + ) + .as_str(), + ) + .with_status(StatusCode::NO_CONTENT.as_u16().into()) + .create(); + + let lakefs_store = setup_env(server.url()); + + rt().block_on(async { handler.pre_execute(&lakefs_store, operation_id).await }) + .unwrap(); + mock_1.assert(); + + let result = + rt().block_on(async { handler.post_execute(&lakefs_store, operation_id).await }); + + assert!(result.is_ok()); + mock_2.assert(); + + if let Some(lakefs_store) = lakefs_store + .clone() + .as_any() + .downcast_ref::() + { + assert!(lakefs_store.client.get_transaction(operation_id).is_err()) + } else { + unreachable!() + } + } + + #[test] + fn test_after_post_commit_hook() { + let handler = LakeFSCustomExecuteHandler {}; + let operation_id = Uuid::new_v4(); + + let mut server = mockito::Server::new(); + let create_branch_mock = server + .mock("POST", "/api/v1/repositories/repo/branches") + .with_status(StatusCode::CREATED.as_u16().into()) + .with_body("") + .create(); + + let create_commit_mock = server + .mock( + "POST", + format!( + "/api/v1/repositories/repo/branches/delta-tx-{}/commits", + operation_id + ) + .as_str(), + ) + .with_status(StatusCode::CREATED.as_u16().into()) + .create(); + + let merge_branch_mock = server + .mock( + "POST", + format!( + "/api/v1/repositories/repo/refs/delta-tx-{}/merge/branch", + operation_id + ) + .as_str(), + ) + .with_status(StatusCode::OK.as_u16().into()) + .create(); + + let delete_branch_mock = server + .mock( + "DELETE", + format!( + "/api/v1/repositories/repo/branches/delta-tx-{}", + operation_id + ) + .as_str(), + ) + .with_status(StatusCode::NO_CONTENT.as_u16().into()) + .create(); + + let lakefs_store = setup_env(server.url()); + + let result = rt().block_on(async { + handler + .before_post_commit_hook(&lakefs_store, true, operation_id) + .await + }); + create_branch_mock.assert(); + assert!(result.is_ok()); + + let result = rt().block_on(async { + handler + .after_post_commit_hook(&lakefs_store, true, operation_id) + .await + }); + + create_commit_mock.assert(); + merge_branch_mock.assert(); + delete_branch_mock.assert(); + assert!(result.is_ok()); + + if let Some(lakefs_store) = lakefs_store + .clone() + .as_any() + .downcast_ref::() + { + assert!(lakefs_store.client.get_transaction(operation_id).is_err()) + } else { + unreachable!() + } + } + + #[tokio::test] + async fn test_execute_error_with_invalid_log_store() { + let location = Url::parse("memory://table").unwrap(); + let invalid_default_store = logstore_for(location, HashMap::default(), None).unwrap(); + + let handler = LakeFSCustomExecuteHandler {}; + let operation_id = Uuid::new_v4(); + + let result = handler + .post_execute(&invalid_default_store, operation_id) + .await; + assert!(result.is_err()); + if let Err(DeltaTableError::Generic(err)) = result { + assert_eq!( + err, + "LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found" + ); + } + + let result = handler + .pre_execute(&invalid_default_store, operation_id) + .await; + assert!(result.is_err()); + if let Err(DeltaTableError::Generic(err)) = result { + assert_eq!( + err, + "LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found" + ); + } + + let result = handler + .before_post_commit_hook(&invalid_default_store, true, operation_id) + .await; + assert!(result.is_err()); + if let Err(DeltaTableError::Generic(err)) = result { + assert_eq!( + err, + "LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found" + ); + } + + let result = handler + .after_post_commit_hook(&invalid_default_store, true, operation_id) + .await; + assert!(result.is_err()); + if let Err(DeltaTableError::Generic(err)) = result { + assert_eq!( + err, + "LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found" + ); + } + } + + #[tokio::test] + async fn test_noop_commit_hook_executor() { + // When file operations is false, the commit hook executor is a noop, since we don't need + // to create any branches, or commit and merge them back. + let location = Url::parse("memory://table").unwrap(); + let invalid_default_store = logstore_for(location, HashMap::default(), None).unwrap(); + + let handler = LakeFSCustomExecuteHandler {}; + let operation_id = Uuid::new_v4(); + + let result = handler + .before_post_commit_hook(&invalid_default_store, false, operation_id) + .await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), ()); + + let result = handler + .after_post_commit_hook(&invalid_default_store, false, operation_id) + .await; + assert!(result.is_ok()); + assert_eq!(result.unwrap(), ()); + } +}