Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: lakefs integration #3103

Merged
merged 11 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,40 @@ jobs:
- name: Run tests with native-tls
run: |
cargo test --no-default-features --features integration_test,s3-native-tls,datafusion

integration_test_lakefs:
name: Integration Tests (LakeFS v1.48)
runs-on: ubuntu-latest
env:
CARGO_INCREMENTAL: 0
# Disable full debug symbol generation to speed up CI build and keep memory down
# <https://doc.rust-lang.org/cargo/reference/profiles.html>
RUSTFLAGS: "-C debuginfo=line-tables-only"
# https://github.com/rust-lang/cargo/issues/10280
CARGO_NET_GIT_FETCH_WITH_CLI: "true"
RUST_BACKTRACE: "1"
RUST_LOG: debug

steps:
- uses: actions/checkout@v3

- name: Install minimal stable with clippy and rustfmt
uses: actions-rs/toolchain@v1
with:
profile: default
toolchain: '1.81'
override: true

- name: Download Lakectl
run: |
wget -q https://github.com/treeverse/lakeFS/releases/download/v1.48.1/lakeFS_1.48.1_Linux_x86_64.tar.gz
tar -xf lakeFS_1.48.1_Linux_x86_64.tar.gz -C $GITHUB_WORKSPACE
echo "$GITHUB_WORKSPACE" >> $GITHUB_PATH

- name: Start emulated services
run: docker compose -f docker-compose-lakefs.yml up -d

- name: Run tests with rustls (default)
run: |
cargo test --features integration_test_lakefs,lakefs,datafusion

2 changes: 1 addition & 1 deletion .github/workflows/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
uses: taiki-e/install-action@cargo-llvm-cov
- uses: Swatinem/rust-cache@v2
- name: Generate code coverage
run: cargo llvm-cov --features ${DEFAULT_FEATURES} --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs
run: cargo llvm-cov --features ${DEFAULT_FEATURES} --workspace --codecov --output-path codecov.json -- --skip read_table_version_hdfs --skip test_read_tables_lakefs
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v4
with:
Expand Down
30 changes: 28 additions & 2 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,38 @@ jobs:
run: make setup-dat

- name: Run tests
run: uv run pytest -m '((s3 or azure) and integration) or not integration and not benchmark' --doctest-modules
run: uv run --no-sync pytest -m '((s3 or azure) and integration) or not integration and not benchmark' --doctest-modules

- name: Test without pandas
run: |
uv pip uninstall pandas
uv run pytest -m "not pandas and not integration and not benchmark"
uv run --no-sync pytest -m "not pandas and not integration and not benchmark"
uv pip install pandas

test-lakefs:
name: Python Build (Python 3.10 LakeFS Integration tests)
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-C debuginfo=1"
CARGO_INCREMENTAL: 0

steps:
- uses: actions/checkout@v3

- name: Setup Environment
uses: ./.github/actions/setup-env

- name: Start emulated services
run: docker compose -f ../docker-compose-lakefs.yml up -d

- name: Build and install deltalake
run: make develop

- name: Download Data Acceptance Tests (DAT) files
run: make setup-dat

- name: Run tests
run: uv run --no-sync pytest -m '(lakefs and integration)' --doctest-modules

test-pyspark:
name: PySpark Integration Tests
Expand All @@ -109,6 +134,7 @@ jobs:
- name: Run tests
run: make test-pyspark


multi-python-running:
name: Running with Python ${{ matrix.python-version }}
runs-on: ubuntu-latest
Expand Down
16 changes: 11 additions & 5 deletions crates/aws/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use deltalake_core::{
};
use object_store::{Error as ObjectStoreError, ObjectStore};
use url::Url;
use uuid::Uuid;

/// Return the [S3LogStore] implementation with the provided configuration options
pub fn default_s3_logstore(
Expand All @@ -30,7 +31,7 @@ pub fn default_s3_logstore(
/// Default [`LogStore`] implementation
#[derive(Debug, Clone)]
pub struct S3LogStore {
pub(crate) storage: Arc<dyn ObjectStore>,
pub(crate) storage: ObjectStoreRef,
config: LogStoreConfig,
}

Expand All @@ -53,7 +54,7 @@ impl LogStore for S3LogStore {
}

async fn read_commit_entry(&self, version: i64) -> DeltaResult<Option<Bytes>> {
read_commit_entry(self.storage.as_ref(), version).await
read_commit_entry(self.object_store(None).as_ref(), version).await
}

/// Tries to commit a prepared commit file. Returns [`TransactionError`]
Expand All @@ -65,10 +66,14 @@ impl LogStore for S3LogStore {
&self,
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => {
Ok(write_commit_entry(&self.object_store(), version, &tmp_commit).await?)
Ok(
write_commit_entry(self.object_store(None).as_ref(), version, &tmp_commit)
.await?,
)
}
_ => unreachable!(), // S3 Log Store should never receive bytes
}
Expand All @@ -87,10 +92,11 @@ impl LogStore for S3LogStore {
&self,
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
match &commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => {
abort_commit_entry(self.storage.as_ref(), version, tmp_commit).await
abort_commit_entry(self.object_store(None).as_ref(), version, tmp_commit).await
}
_ => unreachable!(), // S3 Log Store should never receive bytes
}
Expand All @@ -104,7 +110,7 @@ impl LogStore for S3LogStore {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
fn object_store(&self, _operation_id: Option<Uuid>) -> Arc<dyn ObjectStore> {
self.storage.clone()
}

Expand Down
17 changes: 13 additions & 4 deletions crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use deltalake_core::{
storage::{ObjectStoreRef, StorageOptions},
DeltaResult, DeltaTableError,
};
use uuid::Uuid;

const STORE_NAME: &str = "DeltaS3ObjectStore";
const MAX_REPAIR_RETRIES: i64 = 3;
Expand Down Expand Up @@ -93,7 +94,13 @@ impl S3DynamoDbLogStore {
return Ok(RepairLogEntryResult::AlreadyCompleted);
}
for retry in 0..=MAX_REPAIR_RETRIES {
match write_commit_entry(&self.storage, entry.version, &entry.temp_path).await {
match write_commit_entry(
self.object_store(None).as_ref(),
entry.version,
&entry.temp_path,
)
.await
{
Ok(()) => {
debug!("Successfully committed entry for version {}", entry.version);
return self.try_complete_entry(entry, true).await;
Expand Down Expand Up @@ -192,7 +199,7 @@ impl LogStore for S3DynamoDbLogStore {
if let Ok(Some(entry)) = entry {
self.repair_entry(&entry).await?;
}
read_commit_entry(&self.storage, version).await
read_commit_entry(self.object_store(None).as_ref(), version).await
}

/// Tries to commit a prepared commit file. Returns [DeltaTableError::VersionAlreadyExists]
Expand All @@ -204,6 +211,7 @@ impl LogStore for S3DynamoDbLogStore {
&self,
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
let tmp_commit = match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
Expand Down Expand Up @@ -253,6 +261,7 @@ impl LogStore for S3DynamoDbLogStore {
&self,
version: i64,
commit_or_bytes: CommitOrBytes,
_operation_id: Uuid,
) -> Result<(), TransactionError> {
let tmp_commit = match commit_or_bytes {
CommitOrBytes::TmpCommit(tmp_commit) => tmp_commit,
Expand All @@ -278,7 +287,7 @@ impl LogStore for S3DynamoDbLogStore {
},
})?;

abort_commit_entry(&self.storage, version, &tmp_commit).await?;
abort_commit_entry(self.object_store(None).as_ref(), version, &tmp_commit).await?;
Ok(())
}

Expand All @@ -304,7 +313,7 @@ impl LogStore for S3DynamoDbLogStore {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> ObjectStoreRef {
fn object_store(&self, _operation_id: Option<Uuid>) -> ObjectStoreRef {
self.storage.clone()
}

Expand Down
2 changes: 2 additions & 0 deletions crates/aws/src/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,9 @@ mod tests {
}

#[test]
#[serial]
fn test_is_aws() {
clear_env_of_aws_keys();
let options = StorageOptions::default();
assert!(is_aws(&options));

Expand Down
32 changes: 20 additions & 12 deletions crates/aws/tests/integration_s3_dynamodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use object_store::path::Path;
use serde_json::Value;
use serial_test::serial;
use tracing::log::*;
use uuid::Uuid;

use maplit::hashmap;
use object_store::{PutOptions, PutPayload};
Expand Down Expand Up @@ -95,7 +96,7 @@ async fn test_create_s3_table() -> TestResult<()> {
let _ = pretty_env_logger::try_init();
let context = IntegrationContext::new(Box::new(S3Integration::default()))?;
let _client = make_client()?;
let table_name = format!("{}_{}", "create_test", uuid::Uuid::new_v4());
let table_name = format!("{}_{}", "create_test", Uuid::new_v4());
let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned()));

let schema = StructType::new(vec![StructField::new(
Expand All @@ -113,7 +114,7 @@ async fn test_create_s3_table() -> TestResult<()> {

let payload = PutPayload::from_static(b"test-drivin");
let _put = log_store
.object_store()
.object_store(None)
.put_opts(
&Path::from("_delta_log/_commit_failed.tmp"),
payload,
Expand All @@ -138,10 +139,7 @@ async fn get_missing_item() -> TestResult<()> {
let client = make_client()?;
let version = i64::MAX;
let result = client
.get_commit_entry(
&format!("s3a://my_delta_table_{}", uuid::Uuid::new_v4()),
version,
)
.get_commit_entry(&format!("s3a://my_delta_table_{}", Uuid::new_v4()), version)
.await;
assert_eq!(result.unwrap(), None);
Ok(())
Expand Down Expand Up @@ -186,7 +184,7 @@ async fn test_repair_commit_entry() -> TestResult<()> {
// create another incomplete log entry, this time move the temporary file already
let entry = create_incomplete_commit_entry(&table, 2, "unfinished_commit").await?;
log_store
.object_store()
.object_store(None)
.rename_if_not_exists(&entry.temp_path, &commit_uri_from_version(entry.version))
.await?;

Expand Down Expand Up @@ -253,6 +251,7 @@ async fn test_abort_commit_entry() -> TestResult<()> {
.abort_commit_entry(
entry.version,
CommitOrBytes::TmpCommit(entry.temp_path.clone()),
Uuid::new_v4(),
)
.await?;

Expand All @@ -262,13 +261,17 @@ async fn test_abort_commit_entry() -> TestResult<()> {
}
// Temp commit file should have been deleted
assert!(matches!(
log_store.object_store().get(&entry.temp_path).await,
log_store.object_store(None).get(&entry.temp_path).await,
Err(ObjectStoreError::NotFound { .. })
));

// Test abort commit is idempotent - still works if already aborted
log_store
.abort_commit_entry(entry.version, CommitOrBytes::TmpCommit(entry.temp_path))
.abort_commit_entry(
entry.version,
CommitOrBytes::TmpCommit(entry.temp_path),
Uuid::new_v4(),
)
.await?;

Ok(())
Expand Down Expand Up @@ -301,14 +304,19 @@ async fn test_abort_commit_entry_fail_to_delete_entry() -> TestResult<()> {
log_store
.abort_commit_entry(
entry.version,
CommitOrBytes::TmpCommit(entry.temp_path.clone())
CommitOrBytes::TmpCommit(entry.temp_path.clone()),
Uuid::new_v4(),
)
.await,
Err(_),
));

// Check temp commit file still exists
assert!(log_store.object_store().get(&entry.temp_path).await.is_ok());
assert!(log_store
.object_store(None)
.get(&entry.temp_path)
.await
.is_ok());

Ok(())
}
Expand Down Expand Up @@ -439,7 +447,7 @@ fn add_action(name: &str) -> Action {
}

async fn prepare_table(context: &IntegrationContext, table_name: &str) -> TestResult<DeltaTable> {
let table_name = format!("{}_{}", table_name, uuid::Uuid::new_v4());
let table_name = format!("{}_{}", table_name, Uuid::new_v4());
let table_uri = context.uri_for_table(TestTables::Custom(table_name.to_owned()));
let schema = StructType::new(vec![StructField::new(
"Id".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion crates/aws/tests/repair_s3_rename_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ fn create_s3_backend(
.with_allow_http(true)
.build_storage()
.unwrap()
.object_store();
.object_store(None);

let delayed_store = DelayedObjectStore {
inner: store,
Expand Down
2 changes: 1 addition & 1 deletion crates/azure/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ async fn read_write_test_onelake(context: &IntegrationContext, path: &Path) -> T
let delta_store = DeltaTableBuilder::from_uri(&context.root_uri())
.with_allow_http(true)
.build_storage()?
.object_store();
.object_store(None);

let expected = Bytes::from_static(b"test world from delta-rs on friday");

Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ impl DeltaTableState {
pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
let object_store_url = store.object_store_url();
let url: &Url = object_store_url.as_ref();
env.register_object_store(url, store.object_store());
env.register_object_store(url, store.object_store(None));
}

/// The logical schema for a Deltatable is different from the protocol level schema since partition
Expand Down Expand Up @@ -2708,7 +2708,7 @@ mod tests {
.unwrap();

let (object_store, mut operations) =
RecordingObjectStore::new(table.log_store().object_store());
RecordingObjectStore::new(table.log_store().object_store(None));
let log_store =
DefaultLogStore::new(Arc::new(object_store), table.log_store().config().clone());
let provider = DeltaTableProvider::try_new(
Expand Down
Loading
Loading