Skip to content

Commit

Permalink
Upgrade datafusion 39 (#728)
Browse files Browse the repository at this point in the history
* deps: update datafusion to 39.0.0, pyo3 to 0.21, and object_store to 0.10.1

`datafusion-common` also depends on `pyo3`, so they need to be upgraded together.

* feat: remove GetIndexField

datafusion replaced Expr::GetIndexField with a FieldAccessor trait.

Ref apache/datafusion#10568
Ref apache/datafusion#10769

* feat: update ScalarFunction

The field `func_name` was changed to `func` as part of removing `ScalarFunctionDefinition` upstream.

Ref apache/datafusion#10325

* feat: incorporate upstream array_slice fixes

Fixes #670

* update ExectionPlan::children impl for DatasetExec

Ref apache/datafusion#10543

* update value_interval_daytime

Ref apache/arrow-rs#5769

* update regexp_replace and regexp_match

Fixes #677

* add gil-refs feature to pyo3

This silences pyo3's deprecation warnings for its new Bounds api.

It's the 1st step of the migration, and should be removed before merge.

Ref https://pyo3.rs/v0.21.0/migration#from-020-to-021

* fix signature for octet_length

Ref apache/datafusion#10726

* update signature for covar_samp

AggregateUDF expressions now have a builder API design, which removes arguments like filter and order_by

Ref apache/datafusion#10545
Ref apache/datafusion#10492

* convert covar_pop to expr_fn api

Ref: https://github.com/apache/datafusion/pull/10418/files

* convert median to expr_fn api

Ref apache/datafusion#10644

* convert variance sample to UDF

Ref apache/datafusion#10667

* convert first_value and last_value to UDFs

Ref apache/datafusion#10648

* checkpointing with a few todos to fix remaining compile errors

* impl PyExpr::python_value for IntervalDayTime and IntervalMonthDayNano

* convert sum aggregate function to UDF

* remove unnecessary clone on double reference

* apply cargo fmt

* remove duplicate allow-dead-code annotation

* update tpch examples for new pyarrow interval

Fixes #665

* marked q11 tpch example as expected fail

Ref #730

* add default stride of None back to array_slice
  • Loading branch information
Michael-J-Ward authored Jun 14, 2024
1 parent 860283a commit b5446ef
Show file tree
Hide file tree
Showing 23 changed files with 467 additions and 327 deletions.
540 changes: 321 additions & 219 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 11 additions & 11 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

[package]
name = "datafusion-python"
version = "38.0.1"
version = "39.0.0"
homepage = "https://datafusion.apache.org/python"
repository = "https://github.com/apache/datafusion-python"
authors = ["Apache DataFusion <[email protected]>"]
Expand All @@ -36,28 +36,28 @@ substrait = ["dep:datafusion-substrait"]
[dependencies]
tokio = { version = "1.35", features = ["macros", "rt", "rt-multi-thread", "sync"] }
rand = "0.8"
pyo3 = { version = "0.20", features = ["extension-module", "abi3", "abi3-py38"] }
datafusion = { version = "38.0.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-common = { version = "38.0.0", features = ["pyarrow"] }
datafusion-expr = "38.0.0"
datafusion-functions-array = "38.0.0"
datafusion-optimizer = "38.0.0"
datafusion-sql = "38.0.0"
datafusion-substrait = { version = "38.0.0", optional = true }
pyo3 = { version = "0.21", features = ["extension-module", "abi3", "abi3-py38", "gil-refs"] }
datafusion = { version = "39.0.0", features = ["pyarrow", "avro", "unicode_expressions"] }
datafusion-common = { version = "39.0.0", features = ["pyarrow"] }
datafusion-expr = "39.0.0"
datafusion-functions-array = "39.0.0"
datafusion-optimizer = "39.0.0"
datafusion-sql = "39.0.0"
datafusion-substrait = { version = "39.0.0", optional = true }
prost = "0.12"
prost-types = "0.12"
uuid = { version = "1.8", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
async-trait = "0.1"
futures = "0.3"
object_store = { version = "0.9.1", features = ["aws", "gcp", "azure"] }
object_store = { version = "0.10.1", features = ["aws", "gcp", "azure"] }
parking_lot = "0.12"
regex-syntax = "0.8.1"
syn = "2.0.43"
url = "2.2"

[build-dependencies]
pyo3-build-config = "0.20.0"
pyo3-build-config = "0.21"

[lib]
name = "datafusion_python"
Expand Down
3 changes: 2 additions & 1 deletion docs/source/user-guide/common-operations/functions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,13 @@ DataFusion offers a range of helpful options.
f.left(col('"Name"'), literal(4)).alias("code")
)
This also includes the functions for regular expressions like :func:`.regexp_match`
This also includes the functions for regular expressions like :func:`.regexp_replace` and :func:`.regexp_match`

.. ipython:: python
df.select(
f.regexp_match(col('"Name"'), literal("Char")).alias("dragons"),
f.regexp_replace(col('"Name"'), literal("saur"), literal("fleur")).alias("flowers")
)
Expand Down
5 changes: 4 additions & 1 deletion examples/tpch/_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ def check_q17(df):
("q08_market_share", "q8"),
("q09_product_type_profit_measure", "q9"),
("q10_returned_item_reporting", "q10"),
("q11_important_stock_identification", "q11"),
pytest.param(
"q11_important_stock_identification", "q11",
marks=pytest.mark.xfail # https://github.com/apache/datafusion-python/issues/730
),
("q12_ship_mode_order_priority", "q12"),
("q13_customer_distribution", "q13"),
("q14_promotion_effect", "q14"),
Expand Down
4 changes: 1 addition & 3 deletions examples/tpch/q01_pricing_summary_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@
# want to report results for. It should be between 60-120 days before the end.
DAYS_BEFORE_FINAL = 90

# Note: this is a hack on setting the values. It should be set differently once
# https://github.com/apache/datafusion-python/issues/665 is resolved.
interval = pa.scalar((0, 0, DAYS_BEFORE_FINAL), type=pa.month_day_nano_interval())
interval = pa.scalar((0, DAYS_BEFORE_FINAL, 0), type=pa.month_day_nano_interval())

print("Final date in database:", greatest_ship_date)

Expand Down
4 changes: 1 addition & 3 deletions examples/tpch/q04_order_priority_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@
# Create a date object from the string
date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()

# Note: this is a hack on setting the values. It should be set differently once
# https://github.com/apache/datafusion-python/issues/665 is resolved.
interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval())
interval = pa.scalar((0, INTERVAL_DAYS, 0), type=pa.month_day_nano_interval())

# Limit results to cases where commitment date before receipt date
# Aggregate the results so we only get one row to join with the order table.
Expand Down
4 changes: 1 addition & 3 deletions examples/tpch/q05_local_supplier_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@

date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()

# Note: this is a hack on setting the values. It should be set differently once
# https://github.com/apache/datafusion-python/issues/665 is resolved.
interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval())
interval = pa.scalar((0, INTERVAL_DAYS, 0), type=pa.month_day_nano_interval())

# Load the dataframes we need

Expand Down
4 changes: 1 addition & 3 deletions examples/tpch/q06_forecasting_revenue_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@

date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()

# Note: this is a hack on setting the values. It should be set differently once
# https://github.com/apache/datafusion-python/issues/665 is resolved.
interval = pa.scalar((0, 0, INTERVAL_DAYS), type=pa.month_day_nano_interval())
interval = pa.scalar((0, INTERVAL_DAYS, 0), type=pa.month_day_nano_interval())

# Load the dataframes we need

Expand Down
4 changes: 1 addition & 3 deletions examples/tpch/q10_returned_item_reporting.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@

date_start_of_quarter = lit(datetime.strptime(DATE_START_OF_QUARTER, "%Y-%m-%d").date())

# Note: this is a hack on setting the values. It should be set differently once
# https://github.com/apache/datafusion-python/issues/665 is resolved.
interval_one_quarter = lit(pa.scalar((0, 0, 92), type=pa.month_day_nano_interval()))
interval_one_quarter = lit(pa.scalar((0, 92, 0), type=pa.month_day_nano_interval()))

# Load the dataframes we need

Expand Down
4 changes: 1 addition & 3 deletions examples/tpch/q12_ship_mode_order_priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@

date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()

# Note: this is a hack on setting the values. It should be set differently once
# https://github.com/apache/datafusion-python/issues/665 is resolved.
interval = pa.scalar((0, 0, 365), type=pa.month_day_nano_interval())
interval = pa.scalar((0, 365, 0), type=pa.month_day_nano_interval())


df = df_lineitem.filter(col("l_receiptdate") >= lit(date)).filter(
Expand Down
5 changes: 2 additions & 3 deletions examples/tpch/q14_promotion_effect.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@
DATE = "1995-09-01"

date_of_interest = lit(datetime.strptime(DATE, "%Y-%m-%d").date())
# Note: this is a hack on setting the values. It should be set differently once
# https://github.com/apache/datafusion-python/issues/665 is resolved.
interval_one_month = lit(pa.scalar((0, 0, 30), type=pa.month_day_nano_interval()))

interval_one_month = lit(pa.scalar((0, 30, 0), type=pa.month_day_nano_interval()))

# Load the dataframes we need

Expand Down
5 changes: 2 additions & 3 deletions examples/tpch/q15_top_supplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@
DATE = "1996-01-01"

date_of_interest = lit(datetime.strptime(DATE, "%Y-%m-%d").date())
# Note: this is a hack on setting the values. It should be set differently once
# https://github.com/apache/datafusion-python/issues/665 is resolved.
interval_3_months = lit(pa.scalar((0, 0, 91), type=pa.month_day_nano_interval()))

interval_3_months = lit(pa.scalar((0, 91, 0), type=pa.month_day_nano_interval()))

# Load the dataframes we need

Expand Down
4 changes: 1 addition & 3 deletions examples/tpch/q20_potential_part_promotion.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@

date = datetime.strptime(DATE_OF_INTEREST, "%Y-%m-%d").date()

# Note: this is a hack on setting the values. It should be set differently once
# https://github.com/apache/datafusion-python/issues/665 is resolved.
interval = pa.scalar((0, 0, 365), type=pa.month_day_nano_interval())
interval = pa.scalar((0, 365, 0), type=pa.month_day_nano_interval())

# Filter down dataframes
df_nation = df_nation.filter(col("n_name") == lit(NATION_OF_INTEREST))
Expand Down
2 changes: 0 additions & 2 deletions python/datafusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
ScalarVariable,
Sort,
TableScan,
GetIndexedField,
Not,
IsNotNull,
IsTrue,
Expand Down Expand Up @@ -116,7 +115,6 @@
"SimilarTo",
"ScalarVariable",
"Alias",
"GetIndexedField",
"Not",
"IsNotNull",
"IsTrue",
Expand Down
1 change: 0 additions & 1 deletion python/datafusion/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,6 @@ def py_flatten(arr):
pytest.param(
lambda col: f.list_slice(col, literal(-1), literal(2)),
lambda data: [arr[-1:2] for arr in data],
marks=pytest.mark.xfail,
),
[
lambda col: f.array_intersect(col, literal([3.0, 4.0])),
Expand Down
2 changes: 0 additions & 2 deletions python/datafusion/tests/test_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@
SimilarTo,
ScalarVariable,
Alias,
GetIndexedField,
Not,
IsNotNull,
IsTrue,
Expand Down Expand Up @@ -126,7 +125,6 @@ def test_class_module_is_datafusion():
SimilarTo,
ScalarVariable,
Alias,
GetIndexedField,
Not,
IsNotNull,
IsTrue,
Expand Down
1 change: 1 addition & 0 deletions src/common/data_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ impl DataTypeMap {
pub fn map_from_scalar_to_arrow(scalar_val: &ScalarValue) -> Result<DataType, PyErr> {
match scalar_val {
ScalarValue::Boolean(_) => Ok(DataType::Boolean),
ScalarValue::Float16(_) => Ok(DataType::Float16),
ScalarValue::Float32(_) => Ok(DataType::Float32),
ScalarValue::Float64(_) => Ok(DataType::Float64),
ScalarValue::Decimal128(_, precision, scale) => {
Expand Down
2 changes: 1 addition & 1 deletion src/dataset_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl ExecutionPlan for DatasetExec {
self.schema.clone()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
// this is a leaf node and has no children
vec![]
}
Expand Down
32 changes: 14 additions & 18 deletions src/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ use std::sync::Arc;

use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::arrow::pyarrow::PyArrowType;
use datafusion::functions::core::expr_ext::FieldAccessor;
use datafusion::scalar::ScalarValue;
use datafusion_expr::{
col,
expr::{AggregateFunction, InList, InSubquery, ScalarFunction, Sort, WindowFunction},
lit, Between, BinaryExpr, Case, Cast, Expr, GetFieldAccess, GetIndexedField, Like, Operator,
TryCast,
lit, Between, BinaryExpr, Case, Cast, Expr, Like, Operator, TryCast,
};

use crate::common::data_type::{DataTypeMap, RexType};
Expand Down Expand Up @@ -71,7 +71,6 @@ pub mod filter;
pub mod grouping_set;
pub mod in_list;
pub mod in_subquery;
pub mod indexed_field;
pub mod join;
pub mod like;
pub mod limit;
Expand Down Expand Up @@ -216,13 +215,7 @@ impl PyExpr {
}

fn __getitem__(&self, key: &str) -> PyResult<PyExpr> {
Ok(Expr::GetIndexedField(GetIndexedField::new(
Box::new(self.expr.clone()),
GetFieldAccess::NamedStructField {
name: ScalarValue::Utf8(Some(key.to_string())),
},
))
.into())
Ok(self.expr.clone().field(key).into())
}

#[staticmethod]
Expand Down Expand Up @@ -263,7 +256,7 @@ impl PyExpr {
pub fn rex_type(&self) -> PyResult<RexType> {
Ok(match self.expr {
Expr::Alias(..) => RexType::Alias,
Expr::Column(..) | Expr::GetIndexedField { .. } => RexType::Reference,
Expr::Column(..) => RexType::Reference,
Expr::ScalarVariable(..) | Expr::Literal(..) => RexType::Literal,
Expr::BinaryExpr { .. }
| Expr::Not(..)
Expand Down Expand Up @@ -314,6 +307,11 @@ impl PyExpr {
),
)),
ScalarValue::Boolean(v) => Ok(v.into_py(py)),
ScalarValue::Float16(_) => Err(py_datafusion_err(
datafusion_common::DataFusionError::NotImplemented(
"ScalarValue::Float16".to_string(),
),
)),
ScalarValue::Float32(v) => Ok(v.into_py(py)),
ScalarValue::Float64(v) => Ok(v.into_py(py)),
ScalarValue::Decimal128(v, _, _) => Ok(v.into_py(py)),
Expand Down Expand Up @@ -355,8 +353,10 @@ impl PyExpr {
ScalarValue::TimestampMicrosecond(v, _) => Ok(v.into_py(py)),
ScalarValue::TimestampNanosecond(v, _) => Ok(v.into_py(py)),
ScalarValue::IntervalYearMonth(v) => Ok(v.into_py(py)),
ScalarValue::IntervalDayTime(v) => Ok(v.into_py(py)),
ScalarValue::IntervalMonthDayNano(v) => Ok(v.into_py(py)),
ScalarValue::IntervalDayTime(v) => Ok(ScalarValue::IntervalDayTime(*v).into_py(py)),
ScalarValue::IntervalMonthDayNano(v) => {
Ok(ScalarValue::IntervalMonthDayNano(*v).into_py(py))
}
ScalarValue::DurationSecond(v) => Ok(v.into_py(py)),
ScalarValue::DurationMicrosecond(v) => Ok(v.into_py(py)),
ScalarValue::DurationNanosecond(v) => Ok(v.into_py(py)),
Expand Down Expand Up @@ -417,7 +417,6 @@ impl PyExpr {
| Expr::IsNotFalse(expr)
| Expr::IsNotUnknown(expr)
| Expr::Negative(expr)
| Expr::GetIndexedField(GetIndexedField { expr, .. })
| Expr::Cast(Cast { expr, .. })
| Expr::TryCast(TryCast { expr, .. })
| Expr::Sort(Sort { expr, .. })
Expand Down Expand Up @@ -513,9 +512,7 @@ impl PyExpr {
op,
right: _,
}) => format!("{op}"),
Expr::ScalarFunction(ScalarFunction { func_def, args: _ }) => {
func_def.name().to_string()
}
Expr::ScalarFunction(ScalarFunction { func, args: _ }) => func.name().to_string(),
Expr::Cast { .. } => "cast".to_string(),
Expr::Between { .. } => "between".to_string(),
Expr::Case { .. } => "case".to_string(),
Expand Down Expand Up @@ -674,7 +671,6 @@ pub(crate) fn init_module(m: &PyModule) -> PyResult<()> {
m.add_class::<cast::PyCast>()?;
m.add_class::<cast::PyTryCast>()?;
m.add_class::<between::PyBetween>()?;
m.add_class::<indexed_field::PyGetIndexedField>()?;
m.add_class::<explain::PyExplain>()?;
m.add_class::<limit::PyLimit>()?;
m.add_class::<aggregate::PyAggregate>()?;
Expand Down
7 changes: 1 addition & 6 deletions src/expr/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,7 @@ impl PyLiteral {

pub fn value_interval_day_time(&self) -> PyResult<Option<(i32, i32)>> {
match &self.value {
ScalarValue::IntervalDayTime(Some(iv)) => {
let interval = *iv as u64;
let days = (interval >> 32) as i32;
let ms = interval as i32;
Ok(Some((days, ms)))
}
ScalarValue::IntervalDayTime(Some(iv)) => Ok(Some((iv.days, iv.milliseconds))),
ScalarValue::IntervalDayTime(None) => Ok(None),
other => Err(unexpected_literal_value(other)),
}
Expand Down
1 change: 0 additions & 1 deletion src/expr/signature.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use pyo3::prelude::*;

#[allow(dead_code)]
#[pyclass(name = "Signature", module = "datafusion.expr", subclass)]
#[allow(dead_code)]
#[derive(Clone)]
pub struct PySignature {
type_signature: TypeSignature,
Expand Down
Loading

0 comments on commit b5446ef

Please sign in to comment.