Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: datafusion 34, arrow & parquet 49 #1983

Closed
wants to merge 10 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 19 additions & 21 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
[workspace]
members = [
"crates/*",
"delta-inspect",
"python",
]
members = ["crates/*", "delta-inspect", "python"]
exclude = ["proofs"]
resolver = "2"

Expand All @@ -19,24 +15,26 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "48.0.1" }
arrow-arith = { version = "48.0.1" }
arrow-array = { version = "48.0.1" }
arrow-buffer = { version = "48.0.1" }
arrow-cast = { version = "48.0.1" }
arrow-ord = { version = "48.0.1" }
arrow-row = { version = "48.0.1" }
arrow-schema = { version = "48.0.1" }
arrow-select = { version = "48.0.1" }
parquet = { version = "48.0.1" }
arrow = { version = "49.0.0" }
arrow-arith = { version = "49.0.0" }
arrow-array = { version = "49.0.0" }
arrow-buffer = { version = "49.0.0" }
arrow-cast = { version = "49.0.0" }
arrow-ord = { version = "49.0.0" }
arrow-row = { version = "49.0.0" }
arrow-schema = { version = "49.0.0" }
arrow-select = { version = "49.0.0" }
parquet = { version = "49.0.0" }

object_store = "0.8"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved object_store to a workspace dependency as it's referenced in two separate crates - easier to keep the dependency in sync this way.


# datafusion
datafusion = { version = "33.0.0" }
datafusion-expr = { version = "33.0.0" }
datafusion-common = { version = "33.0.0" }
datafusion-proto = { version = "33.0.0" }
datafusion-sql = { version = "33.0.0" }
datafusion-physical-expr = { version = "33.0.0" }
datafusion = { version = "34.0.0" }
datafusion-expr = { version = "34.0.0" }
datafusion-common = { version = "34.0.0" }
datafusion-proto = { version = "34.0.0" }
datafusion-sql = { version = "34.0.0" }
datafusion-physical-expr = { version = "34.0.0" }


# serde
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-aws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ rusoto_core = { version = "0.47", default-features = false, optional = true }
rusoto_credential = { version = "0.47", optional = true }
rusoto_sts = { version = "0.47", default-features = false, optional = true }
rusoto_dynamodb = { version = "0.47", default-features = false, optional = true }
object_store = "0.7"
object_store = { workspace = true }
lazy_static = "1"
maplit = "1"
thiserror = { workspace = true }
Expand Down
15 changes: 12 additions & 3 deletions crates/deltalake-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,16 @@ edition = "2021"
[package.metadata.docs.rs]
# We cannot use all_features because TLS features are mutually exclusive.
# We cannot use hdfs feature because it requires Java to be installed.
features = ["azure", "datafusion", "gcs", "hdfs", "json", "python", "s3", "unity-experimental"]
features = [
"azure",
"datafusion",
"gcs",
"hdfs",
"json",
"python",
"s3",
"unity-experimental",
]

[dependencies]
# arrow
Expand Down Expand Up @@ -78,7 +87,7 @@ log = "0"
libc = ">=0.2.90, <1"
num-bigint = "0.4"
num-traits = "0.2.15"
object_store = "0.7"
object_store = { workspace = true }
once_cell = "1.16.0"
parking_lot = "0.12"
parquet2 = { version = "0.17", optional = true }
Expand Down Expand Up @@ -111,7 +120,7 @@ reqwest = { version = "0.11.18", default-features = false, features = [
# Datafusion
dashmap = { version = "5", optional = true }

sqlparser = { version = "0.39", optional = true }
sqlparser = { version = "0.40", optional = true }

# NOTE dependencies only for integration tests
fs_extra = { version = "1.3.0", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl ListingSchemaProvider {

/// Reload table information from ObjectStore
pub async fn refresh(&self) -> datafusion_common::Result<()> {
let entries: Vec<_> = self.store.list(None).await?.try_collect().await?;
let entries: Vec<_> = self.store.list(None).try_collect().await?;
let mut tables = HashSet::new();
for file in entries.iter() {
let mut parent = Path::new(file.location.as_ref());
Expand Down
6 changes: 2 additions & 4 deletions crates/deltalake-core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ use datafusion::execution::context::SessionState;
use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::{
expr::{InList, ScalarUDF},
AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource,
expr::InList, AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource,
};
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use sqlparser::ast::escape_quoted_string;
Expand Down Expand Up @@ -186,8 +185,7 @@ impl<'a> Display for SqlFormat<'a> {
Expr::IsNotFalse(expr) => write!(f, "{} IS NOT FALSE", SqlFormat { expr }),
Expr::IsNotUnknown(expr) => write!(f, "{} IS NOT UNKNOWN", SqlFormat { expr }),
Expr::BinaryExpr(expr) => write!(f, "{}", BinaryExprFormat { expr }),
Expr::ScalarFunction(func) => fmt_function(f, &func.fun.to_string(), false, &func.args),
Expr::ScalarUDF(ScalarUDF { fun, args }) => fmt_function(f, &fun.name, false, args),
Expr::ScalarFunction(func) => fmt_function(f, &func.func_def.name(), false, &func.args),
Expr::Cast(Cast { expr, data_type }) => {
write!(f, "arrow_cast({}, '{}')", SqlFormat { expr }, data_type)
}
Expand Down
35 changes: 18 additions & 17 deletions crates/deltalake-core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ use datafusion::datasource::{listing::PartitionedFile, MemTable, TableProvider,
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion::physical_plan::filter::FilterExec;
Expand All @@ -60,8 +59,9 @@ use datafusion_common::scalar::ScalarValue;
use datafusion_common::stats::Precision;
use datafusion_common::tree_node::{TreeNode, TreeNodeVisitor, VisitRecursion};
use datafusion_common::{Column, DataFusionError, Result as DataFusionResult, ToDFSchema};
use datafusion_expr::expr::{ScalarFunction, ScalarUDF};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::logical_plan::CreateExternalTable;
use datafusion_expr::utils::conjunction;
use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility};
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
Expand Down Expand Up @@ -1278,26 +1278,26 @@ impl TreeNodeVisitor for FindFilesExprProperties {
| Expr::Case(_)
| Expr::Cast(_)
| Expr::TryCast(_) => (),
Expr::ScalarFunction(ScalarFunction { fun, .. }) => {
let v = fun.volatility();
if v > Volatility::Immutable {
self.result = Err(DeltaTableError::Generic(format!(
"Find files predicate contains nondeterministic function {}",
fun
)));
return Ok(VisitRecursion::Stop);
}
}
Expr::ScalarUDF(ScalarUDF { fun, .. }) => {
let v = fun.signature.volatility;
Expr::ScalarFunction(ScalarFunction { func_def, .. }) => {
let v = match func_def {
datafusion_expr::ScalarFunctionDefinition::BuiltIn(f) => f.volatility(),
datafusion_expr::ScalarFunctionDefinition::UDF(u) => u.signature().volatility,
datafusion_expr::ScalarFunctionDefinition::Name(n) => {
self.result = Err(DeltaTableError::Generic(format!(
"Cannot determine volatility of find files predicate function {n}",
)));
return Ok(VisitRecursion::Stop);
}
};
if v > Volatility::Immutable {
self.result = Err(DeltaTableError::Generic(format!(
"Find files predicate contains nondeterministic function {}",
fun.name
func_def.name()
)));
return Ok(VisitRecursion::Stop);
}
}

_ => {
self.result = Err(DeltaTableError::Generic(format!(
"Find files predicate contains unsupported expression {}",
Expand Down Expand Up @@ -1764,7 +1764,8 @@ mod tests {
location: Path::from("year=2015/month=1/part-00000-4dcb50d3-d017-450c-9df7-a7257dbd3c5d-c000.snappy.parquet".to_string()),
last_modified: Utc.timestamp_millis_opt(1660497727833).unwrap(),
size: 10644,
e_tag: None
e_tag: None,
version : None
},
partition_values: [ScalarValue::Int64(Some(2015)), ScalarValue::Int64(Some(1))].to_vec(),
range: None,
Expand Down Expand Up @@ -1854,7 +1855,7 @@ mod tests {
]));
let exec_plan = Arc::from(DeltaScan {
table_uri: "s3://my_bucket/this/is/some/path".to_string(),
parquet_scan: Arc::from(EmptyExec::new(false, schema.clone())),
parquet_scan: Arc::from(EmptyExec::new(schema.clone())),
config: DeltaScanConfig::default(),
logical_schema: schema.clone(),
});
Expand Down
4 changes: 2 additions & 2 deletions crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub trait LogStore: Sync + Send {
async fn is_delta_table_location(&self) -> DeltaResult<bool> {
// TODO We should really be using HEAD here, but this fails in windows tests
let object_store = self.object_store();
let mut stream = object_store.list(Some(self.log_path())).await?;
let mut stream = object_store.list(Some(self.log_path()));
if let Some(res) = stream.next().await {
match res {
Ok(_) => Ok(true),
Expand Down Expand Up @@ -272,7 +272,7 @@ async fn get_latest_version(log_store: &dyn LogStore, current_version: i64) -> D
let prefix = Some(log_store.log_path());
let offset_path = commit_uri_from_version(max_version);
let object_store = log_store.object_store();
let mut files = object_store.list_with_offset(prefix, &offset_path).await?;
let mut files = object_store.list_with_offset(prefix, &offset_path);

while let Some(obj_meta) = files.next().await {
let obj_meta = obj_meta?;
Expand Down
1 change: 0 additions & 1 deletion crates/deltalake-core/src/operations/convert_to_delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ impl ConvertToDeltaBuilder {
let mut files = Vec::new();
object_store
.list(None)
.await?
.try_for_each_concurrent(10, |meta| {
if Some("parquet") == meta.location.extension() {
debug!("Found parquet file {:#?}", meta.location);
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/filesystem_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl FileSystemCheckBuilder {
}

let object_store = log_store.object_store();
let mut files = object_store.list(None).await?;
let mut files = object_store.list(None);
while let Some(result) = files.next().await {
let file = result?;
files_relative.remove(file.location.as_ref());
Expand Down
19 changes: 9 additions & 10 deletions crates/deltalake-core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,8 +533,7 @@ impl MergePlan {
context: Arc<zorder::ZOrderExecContext>,
) -> Result<BoxStream<'static, Result<RecordBatch, ParquetError>>, DeltaTableError> {
use datafusion::prelude::{col, ParquetReadOptions};
use datafusion_expr::expr::ScalarUDF;
use datafusion_expr::Expr;
use datafusion_expr::{expr::ScalarFunction, Expr};

let locations = files
.iter()
Expand All @@ -555,7 +554,7 @@ impl MergePlan {
// Add a temporary z-order column we will sort by, and then drop.
const ZORDER_KEY_COLUMN: &str = "__zorder_key";
let cols = context.columns.iter().map(col).collect_vec();
let expr = Expr::ScalarUDF(ScalarUDF::new(
let expr = Expr::ScalarFunction(ScalarFunction::new_udf(
Arc::new(zorder::datafusion::zorder_key_udf()),
cols,
));
Expand Down Expand Up @@ -1136,7 +1135,10 @@ pub(super) mod zorder {
};
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_expr::{ColumnarValue, ScalarUDF, Signature, TypeSignature, Volatility};
use datafusion_expr::{
ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature,
TypeSignature, Volatility,
};
use itertools::Itertools;

pub const ZORDER_UDF_NAME: &str = "zorder_key";
Expand Down Expand Up @@ -1172,12 +1174,9 @@ pub(super) mod zorder {
type_signature: TypeSignature::VariadicAny,
volatility: Volatility::Immutable,
};
ScalarUDF {
name: ZORDER_UDF_NAME.to_string(),
signature,
return_type: Arc::new(|_| Ok(Arc::new(DataType::Binary))),
fun: Arc::new(zorder_key_datafusion),
}
let return_type: ReturnTypeFunction = Arc::new(|_| Ok(Arc::new(DataType::Binary)));
let fun: ScalarFunctionImplementation = Arc::new(zorder_key_datafusion);
ScalarUDF::new(ZORDER_UDF_NAME, &signature, &return_type, &fun)
}

/// Datafusion zorder UDF body
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ use arrow::datatypes::{
};
use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
use datafusion::execution::context::SessionState;
use datafusion::optimizer::utils::conjunction;
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
use datafusion_common::scalar::ScalarValue;
use datafusion_common::{Column, DFSchema};
use datafusion_expr::utils::conjunction;
use datafusion_expr::Expr;
use itertools::Either;
use object_store::ObjectStore;
Expand Down
5 changes: 1 addition & 4 deletions crates/deltalake-core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,7 @@ impl VacuumBuilder {
let mut files_to_delete = vec![];
let mut file_sizes = vec![];
let object_store = self.log_store.object_store();
let mut all_files = object_store
.list(None)
.await
.map_err(DeltaTableError::from)?;
let mut all_files = object_store.list(None);
let partition_columns = &self
.snapshot
.metadata()
Expand Down
1 change: 0 additions & 1 deletion crates/deltalake-core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,6 @@ pub async fn cleanup_expired_logs_for(
.delete_stream(
object_store
.list(Some(log_store.log_path()))
.await?
// This predicate function will filter out any locations that don't
// match the given timestamp range
.filter_map(|meta: Result<crate::ObjectMeta, _>| async move {
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,7 @@ pub(crate) async fn find_latest_check_point_for_version(

let mut cp: Option<CheckPoint> = None;
let object_store = log_store.object_store();
let mut stream = object_store.list(Some(log_store.log_path())).await?;
let mut stream = object_store.list(Some(log_store.log_path()));

while let Some(obj_meta) = stream.next().await {
// Exit early if any objects can't be listed.
Expand Down
18 changes: 14 additions & 4 deletions crates/deltalake-core/src/storage/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use object_store::{
GetResult, ListResult, MultipartId, ObjectMeta as ObjStoreObjectMeta, ObjectStore,
Result as ObjectStoreResult,
};
use object_store::{PutOptions, PutResult};
use std::ops::Range;
use std::sync::Arc;
use tokio::io::AsyncWrite;
Expand Down Expand Up @@ -166,10 +167,19 @@ impl std::fmt::Display for FileStorageBackend {

#[async_trait::async_trait]
impl ObjectStore for FileStorageBackend {
async fn put(&self, location: &ObjectStorePath, bytes: Bytes) -> ObjectStoreResult<()> {
async fn put(&self, location: &ObjectStorePath, bytes: Bytes) -> ObjectStoreResult<PutResult> {
self.inner.put(location, bytes).await
}

async fn put_opts(
&self,
location: &ObjectStorePath,
bytes: Bytes,
options: PutOptions,
) -> ObjectStoreResult<PutResult> {
self.inner.put_opts(location, bytes, options).await
}

async fn get(&self, location: &ObjectStorePath) -> ObjectStoreResult<GetResult> {
self.inner.get(location).await
}
Expand Down Expand Up @@ -198,11 +208,11 @@ impl ObjectStore for FileStorageBackend {
self.inner.delete(location).await
}

async fn list(
fn list(
&self,
prefix: Option<&ObjectStorePath>,
) -> ObjectStoreResult<BoxStream<'_, ObjectStoreResult<ObjStoreObjectMeta>>> {
self.inner.list(prefix).await
) -> BoxStream<ObjectStoreResult<ObjStoreObjectMeta>> {
self.inner.list(prefix)
}

async fn list_with_delimiter(
Expand Down
Loading
Loading