Skip to content

Commit

Permalink
Merge pull request #26 from delta-io/main
Browse files Browse the repository at this point in the history
Merging from fork
  • Loading branch information
gregott-rel authored Oct 5, 2024
2 parents 5d10cbe + 8701046 commit ba708e7
Show file tree
Hide file tree
Showing 26 changed files with 538 additions and 73 deletions.
256 changes: 256 additions & 0 deletions CHANGELOG.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.3.0"
version = "0.4.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
aws-smithy-runtime-api = { version="1.7" }
aws-smithy-runtime = { version="1.7", optional = true}
aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]}
Expand Down
9 changes: 5 additions & 4 deletions crates/aws/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
use std::sync::Arc;

use bytes::Bytes;
use deltalake_core::logstore::*;
use deltalake_core::{
logstore::{
abort_commit_entry, get_latest_version, read_commit_entry, write_commit_entry,
CommitOrBytes, LogStore, LogStoreConfig,
},
operations::transaction::TransactionError,
storage::{ObjectStoreRef, StorageOptions},
DeltaResult,
Expand Down Expand Up @@ -103,6 +100,10 @@ impl LogStore for S3LogStore {
get_latest_version(self, current_version).await
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}
Expand Down
4 changes: 4 additions & 0 deletions crates/aws/src/logstore/dynamodb_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ impl LogStore for S3DynamoDbLogStore {
}
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
get_earliest_version(self, current_version).await
}

fn object_store(&self) -> ObjectStoreRef {
self.storage.clone()
}
Expand Down
4 changes: 2 additions & 2 deletions crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-azure"
version = "0.3.0"
version = "0.4.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
lazy_static = "1"

# workspace depenndecies
Expand Down
4 changes: 2 additions & 2 deletions crates/catalog-glue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-catalog-glue"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -15,7 +15,7 @@ rust-version.workspace = true
async-trait = { workspace = true }
aws-config = "1"
aws-sdk-glue = "1"
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.20.0"
version = "0.21.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
42 changes: 37 additions & 5 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,7 +644,15 @@ impl<'a> DeltaScanBuilder<'a> {
let mut exec_plan_builder = ParquetExecBuilder::new(FileScanConfig {
object_store_url: self.log_store.object_store_url(),
file_schema,
file_groups: file_groups.into_values().collect(),
// If all files were filtered out, we still need to emit at least one partition to
// pass datafusion sanity checks.
//
// See https://github.com/apache/datafusion/issues/11322
file_groups: if file_groups.is_empty() {
vec![vec![]]
} else {
file_groups.into_values().collect()
},
statistics: stats,
projection: self.projection.cloned(),
limit: self.limit,
Expand Down Expand Up @@ -1764,8 +1772,11 @@ impl From<Column> for DeltaColumn {

#[cfg(test)]
mod tests {
use arrow_array::StructArray;
use arrow_schema::Schema;
use crate::operations::create::CreateBuilder;
use crate::operations::write::SchemaMode;
use crate::writer::test_utils::get_delta_schema;
use arrow::array::StructArray;
use arrow::datatypes::{Field, Schema};
use chrono::{TimeZone, Utc};
use datafusion::assert_batches_sorted_eq;
use datafusion::datasource::physical_plan::ParquetExec;
Expand All @@ -1774,13 +1785,12 @@ mod tests {
use datafusion_expr::lit;
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
use delta_kernel::schema::StructField;
use object_store::path::Path;
use serde_json::json;
use std::ops::Deref;

use super::*;
use crate::operations::write::SchemaMode;
use crate::writer::test_utils::get_delta_schema;

// test deserialization of serialized partition values.
// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
Expand Down Expand Up @@ -2566,4 +2576,26 @@ mod tests {
Ok(true)
}
}

#[tokio::test]
async fn passes_sanity_checker_when_all_files_filtered() {
// Run a query that filters out all files and sorts.
// Verify that it returns an empty set of rows without panicing.
//
// Historically, we had a bug that caused us to emit a query plan with 0 partitions, which
// datafusion rejected.
let table = crate::open_table("../test/tests/data/delta-2.2.0-partitioned-types")
.await
.unwrap();
let ctx = SessionContext::new();
ctx.register_table("test", Arc::new(table)).unwrap();

let df = ctx
.sql("select * from test where c3 = 100 ORDER BY c1 ASC")
.await
.unwrap();
let actual = df.collect().await.unwrap();

assert_eq!(actual.len(), 0);
}
}
4 changes: 4 additions & 0 deletions crates/core/src/logstore/default_logstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ impl LogStore for DefaultLogStore {
super::get_latest_version(self, current_version).await
}

async fn get_earliest_version(&self, current_version: i64) -> DeltaResult<i64> {
super::get_earliest_version(self, current_version).await
}

fn object_store(&self) -> Arc<dyn ObjectStore> {
self.storage.clone()
}
Expand Down
52 changes: 51 additions & 1 deletion crates/core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Delta log store.
use std::cmp::min;
use std::io::{BufRead, BufReader, Cursor};
use std::sync::OnceLock;
use std::{cmp::max, collections::HashMap, sync::Arc};

use bytes::Bytes;
use dashmap::DashMap;
use futures::StreamExt;
use futures::{StreamExt, TryStreamExt};
use lazy_static::lazy_static;
use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
use regex::Regex;
Expand Down Expand Up @@ -213,6 +214,9 @@ pub trait LogStore: Sync + Send {
/// Find latest version currently stored in the delta log.
async fn get_latest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Find earliest version currently stored in the delta log.
async fn get_earliest_version(&self, start_version: i64) -> DeltaResult<i64>;

/// Get underlying object store.
fn object_store(&self) -> Arc<dyn ObjectStore>;

Expand Down Expand Up @@ -441,6 +445,52 @@ pub async fn get_latest_version(
Ok(version)
}

/// Default implementation for retrieving the earliest version
pub async fn get_earliest_version(
log_store: &dyn LogStore,
current_version: i64,
) -> DeltaResult<i64> {
let version_start = match get_last_checkpoint(log_store).await {
Ok(last_check_point) => last_check_point.version,
Err(ProtocolError::CheckpointNotFound) => {
// no checkpoint so start from current_version
current_version
}
Err(e) => {
return Err(DeltaTableError::from(e));
}
};

// list files to find min version
let version = async {
let mut min_version: i64 = version_start;
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(version_start);
let object_store = log_store.object_store();

// Manually filter until we can provide direction in https://github.com/apache/arrow-rs/issues/6274
let mut files = object_store
.list(prefix)
.try_filter(move |f| futures::future::ready(f.location < offset_path))
.boxed();

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
if let Some(log_version) = extract_version_from_filename(obj_meta.location.as_ref()) {
min_version = min(min_version, log_version);
}
}

if min_version < 0 {
return Err(DeltaTableError::not_a_table(log_store.root_uri()));
}

Ok::<i64, DeltaTableError>(min_version)
}
.await?;
Ok(version)
}

/// Read delta log for a specific version
pub async fn read_commit_entry(
storage: &dyn ObjectStore,
Expand Down
21 changes: 16 additions & 5 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1154,6 +1154,14 @@ async fn execute(
.select(write_projection.clone())?
.with_column(CDC_COLUMN_NAME, lit("insert"))?,
);

let after = cdc_projection
.clone()
.filter(col(TARGET_COLUMN).is_true())?
.select(write_projection.clone())?;

// Extra select_columns is required so that before and after have same schema order
// DataFusion doesn't have UnionByName yet, see https://github.com/apache/datafusion/issues/12650
let before = cdc_projection
.clone()
.filter(col(crate::delta_datafusion::PATH_COLUMN).is_not_null())?
Expand All @@ -1164,13 +1172,16 @@ async fn execute(
.filter(|c| c.name != crate::delta_datafusion::PATH_COLUMN)
.map(|c| Expr::Column(c.clone()))
.collect_vec(),
)?
.select_columns(
&after
.schema()
.columns()
.iter()
.map(|v| v.name())
.collect::<Vec<_>>(),
)?;

let after = cdc_projection
.clone()
.filter(col(TARGET_COLUMN).is_true())?
.select(write_projection.clone())?;

let tracker = CDCTracker::new(before, after);
change_data.push(tracker.collect()?);
}
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,8 @@ impl MergePlan {
})
.boxed(),
OptimizeOperations::ZOrder(zorder_columns, bins) => {
debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");

#[cfg(not(feature = "datafusion"))]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
Expand All @@ -729,7 +731,6 @@ impl MergePlan {
bins.len() <= num_cpus::get(),
));

debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");
#[cfg(feature = "datafusion")]
let exec_context = Arc::new(zorder::ZOrderExecContext::new(
zorder_columns,
Expand Down
3 changes: 2 additions & 1 deletion crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use object_store::path::Path;
use object_store::Error as ObjectStoreError;
use serde_json::Value;

use self::conflict_checker::{CommitConflictError, TransactionInfo, WinningCommitSummary};
use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for};
use crate::errors::DeltaTableError;
use crate::kernel::{
Expand All @@ -97,6 +97,7 @@ use crate::table::config::TableConfig;
use crate::table::state::DeltaTableState;
use crate::{crate_version, DeltaResult};

pub use self::conflict_checker::CommitConflictError;
pub use self::protocol::INSTANCE as PROTOCOL;

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,11 @@ impl DeltaTable {
self.log_store.get_latest_version(self.version()).await
}

/// returns the earliest available version of the table
pub async fn get_earliest_version(&self) -> Result<i64, DeltaTableError> {
self.log_store.get_earliest_version(self.version()).await
}

/// Currently loaded version of the table
pub fn version(&self) -> i64 {
self.state.as_ref().map(|s| s.version()).unwrap_or(-1)
Expand Down
14 changes: 7 additions & 7 deletions crates/deltalake/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake"
version = "0.20.0"
version = "0.21.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -16,12 +16,12 @@ rust-version.workspace = true
features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"]

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-aws = { version = "0.3.0", path = "../aws", default-features = false, optional = true }
deltalake-azure = { version = "0.3.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.4.0", path = "../gcp", optional = true }
deltalake-hdfs = { version = "0.4.0", path = "../hdfs", optional = true }
deltalake-catalog-glue = { version = "0.4.0", path = "../catalog-glue", optional = true }
deltalake-core = { version = "0.21.0", path = "../core" }
deltalake-aws = { version = "0.4.0", path = "../aws", default-features = false, optional = true }
deltalake-azure = { version = "0.4.0", path = "../azure", optional = true }
deltalake-gcp = { version = "0.5.0", path = "../gcp", optional = true }
deltalake-hdfs = { version = "0.5.0", path = "../hdfs", optional = true }
deltalake-catalog-glue = { version = "0.5.0", path = "../catalog-glue", optional = true }

[features]
# All of these features are just reflected into the core crate until that
Expand Down
4 changes: 2 additions & 2 deletions crates/gcp/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-gcp"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
lazy_static = "1"

# workspace depenndecies
Expand Down
4 changes: 2 additions & 2 deletions crates/hdfs/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-hdfs"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.20.0", path = "../core" }
deltalake-core = { version = "0.21.0", path = "../core" }
hdfs-native-object-store = "0.11"

# workspace dependecies
Expand Down
Loading

0 comments on commit ba708e7

Please sign in to comment.