Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade to datafusion 43 #2886

Merged
merged 14 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/python_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:
run: make check-rust

test-minimal:
name: Python Build (Python 3.8 PyArrow 16.0.0)
name: Python Build (Python 3.9 PyArrow 16.0.0)
runs-on: ubuntu-latest
env:
RUSTFLAGS: "-C debuginfo=line-tables-only"
Expand All @@ -43,7 +43,7 @@ jobs:
- name: Setup Environment
uses: ./.github/actions/setup-env
with:
python-version: 3.8
python-version: 3.9

- name: Build and install deltalake
run: |
Expand Down Expand Up @@ -135,7 +135,7 @@ jobs:

strategy:
matrix:
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
python-version: ["3.9", "3.10", "3.11", "3.12"]

steps:
- uses: actions/checkout@v3
Expand Down
46 changes: 23 additions & 23 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,34 +26,34 @@ debug = true
debug = "line-tables-only"

[workspace.dependencies]
delta_kernel = { version = "=0.3.0" }
delta_kernel = { version = "0.3.1" }
# delta_kernel = { path = "../delta-kernel-rs/kernel", version = "0.3.0" }

# arrow
arrow = { version = "52" }
arrow-arith = { version = "52" }
arrow-array = { version = "52", features = ["chrono-tz"] }
arrow-buffer = { version = "52" }
arrow-cast = { version = "52" }
arrow-ipc = { version = "52" }
arrow-json = { version = "52" }
arrow-ord = { version = "52" }
arrow-row = { version = "52" }
arrow-schema = { version = "52" }
arrow-select = { version = "52" }
object_store = { version = "0.10.2" }
parquet = { version = "52" }
arrow = { version = "53" }
arrow-arith = { version = "53" }
arrow-array = { version = "53", features = ["chrono-tz"] }
arrow-buffer = { version = "53" }
arrow-cast = { version = "53" }
arrow-ipc = { version = "53" }
arrow-json = { version = "53" }
arrow-ord = { version = "53" }
arrow-row = { version = "53" }
arrow-schema = { version = "53" }
arrow-select = { version = "53" }
object_store = { version = "0.11" }
parquet = { version = "53" }

# datafusion
datafusion = { version = "41" }
datafusion-expr = { version = "41" }
datafusion-common = { version = "41" }
datafusion-proto = { version = "41" }
datafusion-sql = { version = "41" }
datafusion-physical-expr = { version = "41" }
datafusion-physical-plan = { version = "41" }
datafusion-functions = { version = "41" }
datafusion-functions-aggregate = { version = "41" }
datafusion = { version = "43" }
datafusion-expr = { version = "43" }
datafusion-common = { version = "43" }
datafusion-proto = { version = "43" }
datafusion-sql = { version = "43" }
datafusion-physical-expr = { version = "43" }
datafusion-physical-plan = { version = "43" }
datafusion-functions = { version = "43" }
datafusion-functions-aggregate = { version = "43" }

# serde
serde = { version = "1.0.194", features = ["derive"] }
Expand Down
4 changes: 2 additions & 2 deletions crates/aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-aws"
version = "0.4.2"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.21.0", path = "../core" }
deltalake-core = { version = "0.22.0", path = "../core" }
aws-smithy-runtime-api = { version="1.7" }
aws-smithy-runtime = { version="1.7", optional = true}
aws-credential-types = { version="1.2", features = ["hardcoded-credentials"]}
Expand Down
4 changes: 2 additions & 2 deletions crates/azure/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-azure"
version = "0.4.0"
version = "0.5.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -12,7 +12,7 @@ repository.workspace = true
rust-version.workspace = true

[dependencies]
deltalake-core = { version = "0.21.0", path = "../core", features = [
deltalake-core = { version = "0.22.0", path = "../core", features = [
"datafusion",
] }
lazy_static = "1"
Expand Down
4 changes: 2 additions & 2 deletions crates/catalog-glue/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-catalog-glue"
version = "0.5.0"
version = "0.6.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand All @@ -15,7 +15,7 @@ rust-version.workspace = true
async-trait = { workspace = true }
aws-config = "1"
aws-sdk-glue = "1"
deltalake-core = { version = "0.21.0", path = "../core" }
deltalake-core = { version = "0.22.0", path = "../core" }
thiserror = { workspace = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "deltalake-core"
version = "0.21.0"
version = "0.22.0"
authors.workspace = true
keywords.workspace = true
readme.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/data_catalog/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const DELTA_LOG_FOLDER: &str = "_delta_log";
///
/// assuming it contains valid deltalake data, i.e a `_delta_log` folder:
/// s3://host.example.com:3000/data/tpch/customer/_delta_log/
#[derive(Debug)]
pub struct ListingSchemaProvider {
authority: String,
/// Underlying object store
Expand Down
3 changes: 3 additions & 0 deletions crates/core/src/data_catalog/unity/datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::data_catalog::models::ListSchemasResponse;
use crate::DeltaTableBuilder;

/// In-memory list of catalogs populated by unity catalog
#[derive(Debug)]
pub struct UnityCatalogList {
/// Collection of catalogs containing schemas and ultimately TableProviders
pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
Expand Down Expand Up @@ -73,6 +74,7 @@ impl CatalogProviderList for UnityCatalogList {
}

/// A datafusion [`CatalogProvider`] backed by Databricks UnityCatalog
#[derive(Debug)]
pub struct UnityCatalogProvider {
/// Parent catalog for schemas of interest.
pub schemas: DashMap<String, Arc<dyn SchemaProvider>>,
Expand Down Expand Up @@ -124,6 +126,7 @@ impl CatalogProvider for UnityCatalogProvider {
}

/// A datafusion [`SchemaProvider`] backed by Databricks UnityCatalog
#[derive(Debug)]
pub struct UnitySchemaProvider {
/// UnityCatalog Api client
client: Arc<UnityCatalog>,
Expand Down
176 changes: 161 additions & 15 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,25 +23,171 @@
use std::fmt::{self, Display, Error, Formatter, Write};
use std::sync::Arc;

use arrow_schema::DataType;
use arrow_array::{Array, GenericListArray};
use arrow_schema::{DataType, Field};
use chrono::{DateTime, NaiveDate};
use datafusion::execution::context::SessionState;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::execution::FunctionRegistry;
use datafusion::functions_array::make_array::MakeArray;
use datafusion_common::Result as DFResult;
use datafusion_common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion_expr::expr::InList;
use datafusion_expr::planner::ExprPlanner;
use datafusion_expr::{AggregateUDF, Between, BinaryExpr, Cast, Expr, Like, TableSource};
// Needed for MakeParquetArray
use datafusion_expr::{ColumnarValue, Documentation, ScalarUDF, ScalarUDFImpl, Signature};
use datafusion_functions::core::planner::CoreFunctionPlanner;
use datafusion_sql::planner::{ContextProvider, SqlToRel};
use datafusion_sql::sqlparser::ast::escape_quoted_string;
use datafusion_sql::sqlparser::dialect::GenericDialect;
use datafusion_sql::sqlparser::parser::Parser;
use datafusion_sql::sqlparser::tokenizer::Tokenizer;
use tracing::log::*;

use super::DeltaParserOptions;
use crate::{DeltaResult, DeltaTableError};

/// This struct is like Datafusion's MakeArray but ensures that `element` is used rather than `item
/// as the field name within the list.
#[derive(Debug)]
struct MakeParquetArray {
/// The actual upstream UDF, which we're just totally cheating and using
actual: MakeArray,
/// Aliases for this UDF
aliases: Vec<String>,
}

impl MakeParquetArray {
pub fn new() -> Self {
let actual = MakeArray::default();
let aliases = vec!["make_array".into(), "make_list".into()];
Self { actual, aliases }
}
}

impl ScalarUDFImpl for MakeParquetArray {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn name(&self) -> &str {
"make_parquet_array"
}

fn signature(&self) -> &Signature {
self.actual.signature()
}

fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
let r_type = match arg_types.len() {
0 => Ok(DataType::List(Arc::new(Field::new(
"element",
DataType::Int32,
true,
)))),
_ => {
// At this point, all the type in array should be coerced to the same one
Ok(DataType::List(Arc::new(Field::new(
"element",
arg_types[0].to_owned(),
true,
))))
}
};
debug!("MakeParquetArray return_type -> {r_type:?}");
r_type
}

fn invoke(&self, args: &[ColumnarValue]) -> Result<ColumnarValue> {
let mut data_type = DataType::Null;
for arg in args {
data_type = arg.data_type();
}

match self.actual.invoke(args)? {
ColumnarValue::Scalar(ScalarValue::List(df_array)) => {
let field = Arc::new(Field::new("element", data_type, true));
let result = Ok(ColumnarValue::Scalar(ScalarValue::List(Arc::new(
GenericListArray::<i32>::try_new(
field,
df_array.offsets().clone(),
arrow_array::make_array(df_array.values().into_data()),
None,
)?,
))));
debug!("MakeParquetArray;invoke returning: {result:?}");
result
}
others => {
error!("Unexpected response inside MakeParquetArray! {others:?}");
Ok(others)
}
}
}

fn invoke_no_args(&self, number_rows: usize) -> Result<ColumnarValue> {
self.actual.invoke_no_args(number_rows)
}

fn aliases(&self) -> &[String] {
&self.aliases
}

fn coerce_types(&self, arg_types: &[DataType]) -> Result<Vec<DataType>> {
self.actual.coerce_types(arg_types)
}

fn documentation(&self) -> Option<&Documentation> {
self.actual.documentation()
}
}

use datafusion::functions_array::planner::{FieldAccessPlanner, NestedFunctionPlanner};

/// This exists becxause the NestedFunctionPlanner _not_ the UserDefinedFunctionPlanner handles the
/// insertion of "make_array" which is used to turn [100] into List<field=element, values=[100]>
///
/// **screaming intensifies**
#[derive(Debug)]
struct CustomNestedFunctionPlanner {
original: NestedFunctionPlanner,
}

impl Default for CustomNestedFunctionPlanner {
fn default() -> Self {
Self {
original: NestedFunctionPlanner,
}
}
}

use datafusion_expr::planner::{PlannerResult, RawBinaryExpr};
impl ExprPlanner for CustomNestedFunctionPlanner {
fn plan_array_literal(
&self,
exprs: Vec<Expr>,
_schema: &DFSchema,
) -> Result<PlannerResult<Vec<Expr>>> {
let udf = Arc::new(ScalarUDF::from(MakeParquetArray::new()));

Ok(PlannerResult::Planned(udf.call(exprs)))
}
fn plan_binary_op(
&self,
expr: RawBinaryExpr,
schema: &DFSchema,
) -> Result<PlannerResult<RawBinaryExpr>> {
self.original.plan_binary_op(expr, schema)
}
fn plan_make_map(&self, args: Vec<Expr>) -> Result<PlannerResult<Vec<Expr>>> {
self.original.plan_make_map(args)
}
fn plan_any(&self, expr: RawBinaryExpr) -> Result<PlannerResult<RawBinaryExpr>> {
self.original.plan_any(expr)
}
}

pub(crate) struct DeltaContextProvider<'a> {
state: SessionState,
/// Keeping this around just to make use of the 'a lifetime
Expand All @@ -51,22 +197,22 @@ pub(crate) struct DeltaContextProvider<'a> {

impl<'a> DeltaContextProvider<'a> {
fn new(state: &'a SessionState) -> Self {
let planners = state.expr_planners();
// default planners are [CoreFunctionPlanner, NestedFunctionPlanner, FieldAccessPlanner,
// UserDefinedFunctionPlanner]
let planners: Vec<Arc<dyn ExprPlanner>> = vec![
Arc::new(CoreFunctionPlanner::default()),
Arc::new(CustomNestedFunctionPlanner::default()),
Arc::new(FieldAccessPlanner),
Arc::new(datafusion::functions::planner::UserDefinedFunctionPlanner),
];
// Disable the above for testing
//let planners = state.expr_planners();
let new_state = SessionStateBuilder::new_from_existing(state.clone())
.with_expr_planners(planners.clone())
.build();
DeltaContextProvider {
planners,
// Creating a new session state with overridden scalar_functions since
// the get_field() UDF was dropped from the default scalar functions upstream in
// `36660fe10d9c0cdff62e0da0b94bee28422d3419`
state: SessionStateBuilder::new_from_existing(state.clone())
.with_scalar_functions(
state
.scalar_functions()
.values()
.cloned()
.chain(std::iter::once(datafusion::functions::core::get_field()))
.collect(),
)
.build(),
state: new_state,
_original: state,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/logical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use datafusion_expr::{LogicalPlan, UserDefinedLogicalNodeCore};
// Metric Observer is used to update DataFusion metrics from a record batch.
// See MetricObserverExec for the physical implementation

#[derive(Debug, Hash, Eq, PartialEq)]
#[derive(Debug, Hash, Eq, PartialEq, PartialOrd)]
pub(crate) struct MetricObserver {
// id is preserved during conversion to physical node
pub id: String,
Expand Down
Loading
Loading