Skip to content

Commit

Permalink
chore: execute handler tests
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Jan 9, 2025
1 parent 8e727c5 commit 0d466bf
Show file tree
Hide file tree
Showing 2 changed files with 315 additions and 2 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,8 @@ jobs:

- name: Download Lakectl
run: |
wget -q https://github.com/treeverse/lakeFS/releases/download/v1.48.0/lakeFS_1.48.0_Linux_x86_64.tar.gz
tar -xf lakeFS_1.48.0_Linux_x86_64.tar.gz -C $GITHUB_WORKSPACE
wget -q https://github.com/treeverse/lakeFS/releases/download/v1.48.1/lakeFS_1.48.1_Linux_x86_64.tar.gz
tar -xf lakeFS_1.48.1_Linux_x86_64.tar.gz -C $GITHUB_WORKSPACE
echo "$GITHUB_WORKSPACE" >> $GITHUB_PATH
- name: Start emulated services
Expand Down
313 changes: 313 additions & 0 deletions crates/lakefs/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,316 @@ impl CustomExecuteHandler for LakeFSCustomExecuteHandler {
Ok(())
}
}

#[cfg(test)]
mod tests {
use crate::register_handlers;

use super::*;
use deltalake_core::{logstore::logstore_for, storage::ObjectStoreRegistry};
use http::StatusCode;
use maplit::hashmap;
use std::{collections::HashMap, sync::OnceLock};
use tokio::runtime::Runtime;
use url::Url;
use uuid::Uuid;

fn setup_env(server_url: String) -> LogStoreRef {
register_handlers(None);
let location = Url::parse("lakefs://repo/branch/table").unwrap();
let raw_options = hashmap! {
"ACCESS_KEY_ID".to_string() => "options_key".to_string(),
"ENDPOINT_URL".to_string() => server_url,
"SECRET_ACCESS_KEY".to_string() => "options_key".to_string(),
"REGION".to_string() => "options_key".to_string()
};
logstore_for(location, raw_options, None).unwrap()
}

#[inline]
fn rt() -> &'static Runtime {
static TOKIO_RT: OnceLock<Runtime> = OnceLock::new();
TOKIO_RT.get_or_init(|| Runtime::new().expect("Failed to create a tokio runtime."))
}

#[test]
fn test_pre_execute() {
let handler = LakeFSCustomExecuteHandler {};
let operation_id = Uuid::new_v4();

let mut server = mockito::Server::new();
let mock = server
.mock("POST", "/api/v1/repositories/repo/branches")
.with_status(StatusCode::CREATED.as_u16().into())
.with_body("")
.create();

let lakefs_store = setup_env(server.url());

let result =
rt().block_on(async { handler.pre_execute(&lakefs_store, operation_id).await });
mock.assert();
assert!(result.is_ok());

if let Some(lakefs_store) = lakefs_store
.clone()
.as_any()
.downcast_ref::<LakeFSLogStore>()
{
assert!(lakefs_store
.storage
.get_store(
&Url::parse(format!("lakefs://repo/delta-tx-{}/table", operation_id).as_str())
.unwrap()
)
.is_ok());

assert!(lakefs_store.client.get_transaction(operation_id).is_ok())
} else {
unreachable!()
}
}

#[test]
fn test_before_post_commit_hook() {
let handler = LakeFSCustomExecuteHandler {};
let operation_id = Uuid::new_v4();

let mut server = mockito::Server::new();
let mock = server
.mock("POST", "/api/v1/repositories/repo/branches")
.with_status(StatusCode::CREATED.as_u16().into())
.with_body("")
.create();

let lakefs_store = setup_env(server.url());

let result = rt().block_on(async {
handler
.before_post_commit_hook(&lakefs_store, true, operation_id)
.await
});
mock.assert();
assert!(result.is_ok());

if let Some(lakefs_store) = lakefs_store
.clone()
.as_any()
.downcast_ref::<LakeFSLogStore>()
{
assert!(lakefs_store
.storage
.get_store(
&Url::parse(format!("lakefs://repo/delta-tx-{}/table", operation_id).as_str())
.unwrap()
)
.is_ok());

assert!(lakefs_store.client.get_transaction(operation_id).is_ok())
} else {
unreachable!()
}
}

#[test]
fn test_post_execute() {
let handler = LakeFSCustomExecuteHandler {};
let operation_id = Uuid::new_v4();

let mut server = mockito::Server::new();
let mock_1 = server
.mock("POST", "/api/v1/repositories/repo/branches")
.with_status(StatusCode::CREATED.as_u16().into())
.with_body("")
.create();
let mock_2 = server
.mock(
"DELETE",
format!(
"/api/v1/repositories/repo/branches/delta-tx-{}",
operation_id
)
.as_str(),
)
.with_status(StatusCode::NO_CONTENT.as_u16().into())
.create();

let lakefs_store = setup_env(server.url());

rt().block_on(async { handler.pre_execute(&lakefs_store, operation_id).await })
.unwrap();
mock_1.assert();

let result =
rt().block_on(async { handler.post_execute(&lakefs_store, operation_id).await });

assert!(result.is_ok());
mock_2.assert();

if let Some(lakefs_store) = lakefs_store
.clone()
.as_any()
.downcast_ref::<LakeFSLogStore>()
{
assert!(lakefs_store.client.get_transaction(operation_id).is_err())
} else {
unreachable!()
}
}

#[test]
fn test_after_post_commit_hook() {
let handler = LakeFSCustomExecuteHandler {};
let operation_id = Uuid::new_v4();

let mut server = mockito::Server::new();
let create_branch_mock = server
.mock("POST", "/api/v1/repositories/repo/branches")
.with_status(StatusCode::CREATED.as_u16().into())
.with_body("")
.create();

let create_commit_mock = server
.mock(
"POST",
format!(
"/api/v1/repositories/repo/branches/delta-tx-{}/commits",
operation_id
)
.as_str(),
)
.with_status(StatusCode::CREATED.as_u16().into())
.create();

let merge_branch_mock = server
.mock(
"POST",
format!(
"/api/v1/repositories/repo/refs/delta-tx-{}/merge/branch",
operation_id
)
.as_str(),
)
.with_status(StatusCode::OK.as_u16().into())
.create();

let delete_branch_mock = server
.mock(
"DELETE",
format!(
"/api/v1/repositories/repo/branches/delta-tx-{}",
operation_id
)
.as_str(),
)
.with_status(StatusCode::NO_CONTENT.as_u16().into())
.create();

let lakefs_store = setup_env(server.url());

let result = rt().block_on(async {
handler
.before_post_commit_hook(&lakefs_store, true, operation_id)
.await
});
create_branch_mock.assert();
assert!(result.is_ok());

let result = rt().block_on(async {
handler
.after_post_commit_hook(&lakefs_store, true, operation_id)
.await
});

create_commit_mock.assert();
merge_branch_mock.assert();
delete_branch_mock.assert();
assert!(result.is_ok());

if let Some(lakefs_store) = lakefs_store
.clone()
.as_any()
.downcast_ref::<LakeFSLogStore>()
{
assert!(lakefs_store.client.get_transaction(operation_id).is_err())
} else {
unreachable!()
}
}

#[tokio::test]
async fn test_execute_error_with_invalid_log_store() {
let location = Url::parse("memory://table").unwrap();
let invalid_default_store = logstore_for(location, HashMap::default(), None).unwrap();

let handler = LakeFSCustomExecuteHandler {};
let operation_id = Uuid::new_v4();

let result = handler
.post_execute(&invalid_default_store, operation_id)
.await;
assert!(result.is_err());
if let Err(DeltaTableError::Generic(err)) = result {
assert_eq!(
err,
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found"
);
}

let result = handler
.pre_execute(&invalid_default_store, operation_id)
.await;
assert!(result.is_err());
if let Err(DeltaTableError::Generic(err)) = result {
assert_eq!(
err,
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found"
);
}

let result = handler
.before_post_commit_hook(&invalid_default_store, true, operation_id)
.await;
assert!(result.is_err());
if let Err(DeltaTableError::Generic(err)) = result {
assert_eq!(
err,
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found"
);
}

let result = handler
.after_post_commit_hook(&invalid_default_store, true, operation_id)
.await;
assert!(result.is_err());
if let Err(DeltaTableError::Generic(err)) = result {
assert_eq!(
err,
"LakeFSPreEcuteHandler is used, but no LakeFSLogStore has been found"
);
}
}

#[tokio::test]
async fn test_noop_commit_hook_executor() {
// When file operations is false, the commit hook executor is a noop, since we don't need
// to create any branches, or commit and merge them back.
let location = Url::parse("memory://table").unwrap();
let invalid_default_store = logstore_for(location, HashMap::default(), None).unwrap();

let handler = LakeFSCustomExecuteHandler {};
let operation_id = Uuid::new_v4();

let result = handler
.before_post_commit_hook(&invalid_default_store, false, operation_id)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), ());

let result = handler
.after_post_commit_hook(&invalid_default_store, false, operation_id)
.await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), ());
}
}

0 comments on commit 0d466bf

Please sign in to comment.