Skip to content

Commit

Permalink
chore(rust): bump arrow v51 and datafusion v37.1 (delta-io#2395)
Browse files Browse the repository at this point in the history
# Description
Update the arrow and datafusion dependencies.

# Related Issue(s)
- closes delta-io#2328

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: R. Tyler Croy <[email protected]>
Co-authored-by: Ion Koutsouris <[email protected]>
  • Loading branch information
3 people authored Apr 26, 2024
1 parent 6a7c684 commit 9d3ecbe
Show file tree
Hide file tree
Showing 20 changed files with 284 additions and 135 deletions.
45 changes: 21 additions & 24 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
[workspace]
members = [
"crates/*",
"delta-inspect",
"python",
]
members = ["crates/*", "delta-inspect", "python"]
exclude = ["proofs"]
resolver = "2"

Expand Down Expand Up @@ -31,28 +27,29 @@ debug = "line-tables-only"

[workspace.dependencies]
# arrow
arrow = { version = "50" }
arrow-arith = { version = "50" }
arrow-array = { version = "50", features = ["chrono-tz"]}
arrow-buffer = { version = "50" }
arrow-cast = { version = "50" }
arrow-ipc = { version = "50" }
arrow-json = { version = "50" }
arrow-ord = { version = "50" }
arrow-row = { version = "50" }
arrow-schema = { version = "50" }
arrow-select = { version = "50" }
arrow = { version = "51" }
arrow-arith = { version = "51" }
arrow-array = { version = "51", features = ["chrono-tz"] }
arrow-buffer = { version = "51" }
arrow-cast = { version = "51" }
arrow-ipc = { version = "51" }
arrow-json = { version = "51" }
arrow-ord = { version = "51" }
arrow-row = { version = "51" }
arrow-schema = { version = "51" }
arrow-select = { version = "51" }
object_store = { version = "0.9" }
parquet = { version = "50" }
parquet = { version = "51" }

# datafusion
datafusion = { version = "36" }
datafusion-expr = { version = "36" }
datafusion-common = { version = "36" }
datafusion-proto = { version = "36" }
datafusion-sql = { version = "36" }
datafusion-physical-expr = { version = "36" }
datafusion-functions = { version = "36" }
datafusion = { version = "37.1" }
datafusion-expr = { version = "37.1" }
datafusion-common = { version = "37.1" }
datafusion-proto = { version = "37.1" }
datafusion-sql = { version = "37.1" }
datafusion-physical-expr = { version = "37.1" }
datafusion-functions = { version = "37.1" }
datafusion-functions-array = { version = "37.1" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
2 changes: 2 additions & 0 deletions crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ datafusion-proto = { workspace = true, optional = true }
datafusion-sql = { workspace = true, optional = true }
datafusion-physical-expr = { workspace = true, optional = true }
datafusion-functions = { workspace = true, optional = true }
datafusion-functions-array = { workspace = true, optional = true }

# serde
serde = { workspace = true, features = ["derive"] }
Expand Down Expand Up @@ -123,6 +124,7 @@ datafusion = [
"datafusion-physical-expr",
"datafusion-sql",
"datafusion-functions",
"datafusion-functions-array",
"sqlparser",
]
datafusion-ext = ["datafusion"]
Expand Down
13 changes: 7 additions & 6 deletions crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,13 @@ impl SchemaProvider for ListingSchemaProvider {
self.tables.iter().map(|t| t.key().clone()).collect()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let location = self.tables.get(name).map(|t| t.clone())?;
let provider = open_table_with_storage_options(location, self.storage_options.0.clone())
.await
.ok()?;
Some(Arc::new(provider) as Arc<dyn TableProvider>)
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let Some(location) = self.tables.get(name).map(|t| t.clone()) else {
return Ok(None);
};
let provider =
open_table_with_storage_options(location, self.storage_options.0.clone()).await?;
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
}

fn register_table(
Expand Down
12 changes: 6 additions & 6 deletions crates/core/src/data_catalog/unity/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use dashmap::DashMap;
use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::datasource::TableProvider;
use datafusion_common::DataFusionError;
use tracing::error;

use super::models::{GetTableResponse, ListCatalogsResponse, ListTableSummariesResponse};
Expand Down Expand Up @@ -180,25 +181,24 @@ impl SchemaProvider for UnitySchemaProvider {
self.table_names.clone()
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
async fn table(&self, name: &str) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
let maybe_table = self
.client
.get_table(&self.catalog_name, &self.schema_name, name)
.await
.ok()?;
.map_err(|err| DataFusionError::External(Box::new(err)))?;

match maybe_table {
GetTableResponse::Success(table) => {
let table = DeltaTableBuilder::from_uri(table.storage_location)
.with_storage_options(self.storage_options.clone())
.load()
.await
.ok()?;
Some(Arc::new(table))
.await?;
Ok(Some(Arc::new(table)))
}
GetTableResponse::Error(err) => {
error!("failed to fetch table from unity catalog: {}", err.message);
None
Err(DataFusionError::External(Box::new(err)))
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion crates/core/src/data_catalog/unity/models.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
//! Api models for databricks unity catalog APIs
use core::fmt;
use std::collections::HashMap;

use serde::Deserialize;

/// Error response from unity API
#[derive(Deserialize)]
#[derive(Debug, Deserialize)]
pub struct ErrorResponse {
/// The error code
pub error_code: String,
/// The error message
pub message: String,
}
impl fmt::Display for ErrorResponse {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "[{}] {}", self.error_code, self.message)
}
}
impl std::error::Error for ErrorResponse {}

/// List catalogs response
#[derive(Deserialize)]
Expand Down
93 changes: 73 additions & 20 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//! Utility functions for Datafusion's Expressions
use std::{
fmt::{self, format, Display, Error, Formatter, Write},
fmt::{self, Display, Error, Formatter, Write},
sync::Arc,
};

Expand Down Expand Up @@ -76,6 +76,18 @@ impl<'a> ContextProvider for DeltaContextProvider<'a> {
fn get_table_source(&self, _name: TableReference) -> DFResult<Arc<dyn TableSource>> {
unimplemented!()
}

fn udfs_names(&self) -> Vec<String> {
unimplemented!()
}

fn udafs_names(&self) -> Vec<String> {
unimplemented!()
}

fn udwfs_names(&self) -> Vec<String> {
unimplemented!()
}
}

/// Parse a string predicate into an `Expr`
Expand Down Expand Up @@ -416,8 +428,13 @@ mod test {
use arrow_schema::DataType as ArrowDataType;
use datafusion::prelude::SessionContext;
use datafusion_common::{Column, ScalarValue, ToDFSchema};
use datafusion_expr::{cardinality, col, lit, substring, Cast, Expr, ExprSchemable};
use datafusion_expr::expr::ScalarFunction;
use datafusion_expr::{
col, lit, substring, BinaryExpr, Cast, Expr, ExprSchemable, ScalarFunctionDefinition,
};
use datafusion_functions::core::arrow_cast;
use datafusion_functions::encoding::expr_fn::decode;
use datafusion_functions_array::expr_fn::cardinality;

use crate::delta_datafusion::{DataFusionMixins, DeltaSessionContext};
use crate::kernel::{ArrayType, DataType, PrimitiveType, StructField, StructType};
Expand Down Expand Up @@ -539,13 +556,24 @@ mod test {

// String expression that we output must be parsable for conflict resolution.
let tests = vec![
simple!(
Expr::Cast(Cast {
ParseTest {
expr: Expr::Cast(Cast {
expr: Box::new(lit(1_i64)),
data_type: ArrowDataType::Int32
}),
"arrow_cast(1, 'Int32')".to_string()
),
expected: "arrow_cast(1, 'Int32')".to_string(),
override_expected_expr: Some(
datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
lit(ScalarValue::Int64(Some(1))),
lit(ScalarValue::Utf8(Some("Int32".into())))
]
}
)
),
},
simple!(
Expr::Column(Column::from_qualified_name_ignore_case("Value3")).eq(lit(3_i64)),
"Value3 = 3".to_string()
Expand Down Expand Up @@ -624,9 +652,8 @@ mod test {
substring(col("modified"), lit(0_i64), lit(4_i64)).eq(lit("2021")),
"substr(modified, 0, 4) = '2021'".to_string()
),
simple!(
col("value")
.cast_to(
ParseTest {
expr: col("value").cast_to(
&arrow_schema::DataType::Utf8,
&table
.snapshot()
Expand All @@ -640,8 +667,23 @@ mod test {
)
.unwrap()
.eq(lit("1")),
"arrow_cast(value, 'Utf8') = '1'".to_string()
),
expected: "arrow_cast(value, 'Utf8') = '1'".to_string(),
override_expected_expr: Some(
datafusion_expr::Expr::BinaryExpr(BinaryExpr {
left: Box::new(datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
col("value"),
lit(ScalarValue::Utf8(Some("Utf8".into())))
]
}
)),
op: datafusion_expr::Operator::Eq,
right: Box::new(lit(ScalarValue::Utf8(Some("1".into()))))
})
),
},
simple!(
col("_struct").field("a").eq(lit(20_i64)),
"_struct['a'] = 20".to_string()
Expand All @@ -662,11 +704,16 @@ mod test {
expr: col("_timestamp_ntz").gt(lit(ScalarValue::TimestampMicrosecond(Some(1262304000000000), None))),
expected: "_timestamp_ntz > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, None)')".to_string(),
override_expected_expr: Some(col("_timestamp_ntz").gt(
datafusion_expr::Expr::Cast( Cast {
expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))),
data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, None)
}
))),
datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))),
lit(ScalarValue::Utf8(Some("Timestamp(Microsecond, None)".into())))
]
}
)
)),
},
ParseTest {
expr: col("_timestamp").gt(lit(ScalarValue::TimestampMicrosecond(
Expand All @@ -675,10 +722,16 @@ mod test {
))),
expected: "_timestamp > arrow_cast('2010-01-01T00:00:00.000000', 'Timestamp(Microsecond, Some(\"UTC\"))')".to_string(),
override_expected_expr: Some(col("_timestamp").gt(
datafusion_expr::Expr::Cast( Cast {
expr: Box::new(lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into())))),
data_type:ArrowDataType::Timestamp(arrow_schema::TimeUnit::Microsecond, Some("UTC".into()))
}))),
datafusion_expr::Expr::ScalarFunction(
ScalarFunction {
func_def: ScalarFunctionDefinition::UDF(arrow_cast()),
args: vec![
lit(ScalarValue::Utf8(Some("2010-01-01T00:00:00.000000".into()))),
lit(ScalarValue::Utf8(Some("Timestamp(Microsecond, Some(\"UTC\"))".into())))
]
}
)
)),
},
];

Expand Down
7 changes: 4 additions & 3 deletions crates/core/src/delta_datafusion/find_files/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, Phy
use datafusion::prelude::SessionContext;
use datafusion_common::{DFSchemaRef, Result, ToDFSchema};
use datafusion_expr::{col, Expr, LogicalPlan, UserDefinedLogicalNode};
use datafusion_physical_expr::create_physical_expr;
use lazy_static::lazy_static;

use crate::delta_datafusion::find_files::logical::FindFilesNode;
Expand All @@ -29,6 +28,8 @@ use crate::logstore::LogStoreRef;
use crate::table::state::DeltaTableState;
use crate::DeltaTableError;

use super::create_physical_expr_fix;

pub mod logical;
pub mod physical;

Expand Down Expand Up @@ -160,8 +161,8 @@ async fn scan_table_by_files(
let input_schema = scan.logical_schema.as_ref().to_owned();
let input_dfschema = input_schema.clone().try_into()?;

let predicate_expr = create_physical_expr(
&Expr::IsTrue(Box::new(expression.clone())),
let predicate_expr = create_physical_expr_fix(
Expr::IsTrue(Box::new(expression.clone())),
&input_dfschema,
state.execution_props(),
)?;
Expand Down
20 changes: 12 additions & 8 deletions crates/core/src/delta_datafusion/find_files/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ use arrow_schema::SchemaRef;
use datafusion::error::Result;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan};
use datafusion::physical_plan::{
DisplayAs, DisplayFormatType, ExecutionMode, ExecutionPlan, PlanProperties,
};
use datafusion::prelude::SessionContext;
use datafusion_common::tree_node::TreeNode;
use datafusion_expr::Expr;
use datafusion_physical_expr::{Partitioning, PhysicalSortExpr};
use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
use futures::stream::BoxStream;
use futures::{FutureExt, Stream, StreamExt, TryStreamExt};

Expand All @@ -28,6 +30,7 @@ pub struct FindFilesExec {
predicate: Expr,
state: DeltaTableState,
log_store: LogStoreRef,
plan_properties: PlanProperties,
}

impl FindFilesExec {
Expand All @@ -36,6 +39,11 @@ impl FindFilesExec {
predicate,
log_store,
state,
plan_properties: PlanProperties::new(
EquivalenceProperties::new(ONLY_FILES_SCHEMA.clone()),
Partitioning::RoundRobinBatch(num_cpus::get()),
ExecutionMode::Bounded,
),
})
}
}
Expand Down Expand Up @@ -85,12 +93,8 @@ impl ExecutionPlan for FindFilesExec {
ONLY_FILES_SCHEMA.clone()
}

fn output_partitioning(&self) -> Partitioning {
Partitioning::RoundRobinBatch(num_cpus::get())
}

fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
None
fn properties(&self) -> &PlanProperties {
&self.plan_properties
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
Expand Down
Loading

0 comments on commit 9d3ecbe

Please sign in to comment.