Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to DataFusion 42, Arrow 53.1 (RC), remove DataFrameDataSource Python logic #513

Merged
merged 19 commits into from
Oct 6, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Initial update to DataFusion 42 and Arrow 53.1
jonmmease committed Oct 5, 2024
commit 0bf305120c197eb2a196c6e0edf771eff05922b5
1,872 changes: 926 additions & 946 deletions Cargo.lock

Large diffs are not rendered by default.

48 changes: 33 additions & 15 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -13,52 +13,70 @@ members = [
]

[workspace.dependencies]
arrow = { version = "52.2.0", default-features = false }
sqlparser = { version = "0.49.0" }
# arrow = { version = "53.1.0", default-features = false }
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }

sqlparser = { version = "0.50.0" }
chrono = { version = "0.4.35", default-features = false }
chrono-tz = { version = "0.9.0", features = [
"case-insensitive",
"filter-by-regex",
] }
reqwest = { version = "0.11.22", default-features = false }
tokio = { version = "1.36.0" }
pyo3 = { version = "0.21.1" }
pythonize = { version = "0.21.1" }
pyo3 = { version = "0.22" }
pythonize = { version = "0.22" }
prost = { version = "0.12.3" }
prost-types = { version = "0.12.3" }
object_store = { version = "0.10.1" }
object_store = { version = "0.11.0" }

[workspace.dependencies.datafusion]
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies.datafusion-common]
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies.datafusion-expr]
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies.datafusion-proto]
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies.datafusion-proto-common]
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies.datafusion-physical-expr]
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies.datafusion-optimizer]
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies.datafusion-functions]
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies.datafusion-functions-nested]
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies.datafusion-functions-aggregate]
version = "41.0.0"
version = "42.0.0"

[workspace.dependencies.datafusion-functions-window]
version = "42.0.0"

[profile.release]
## Tell `rustc` to use highest performance optimization and perform Link Time Optimization
opt-level = 3
# lto = true

[patch.crates-io]
arrow = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
arrow-array = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
arrow-buffer = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
arrow-data = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
arrow-schema = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
arrow-ord = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
arrow-select = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
arrow-string = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
arrow-ipc = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
arrow-cast = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
parquet = { git = "https://github.com/apache/arrow-rs.git", rev = "53.1.0" }
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/prost_gen/errors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskGraphValueError {
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/prost_gen/expression.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
/// ESTree-style AST nodes
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/prost_gen/pretransform.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
/// / Pre transform spec messages
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/prost_gen/services.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueryRequest {
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/prost_gen/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
/// ## Task Value
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/prost_gen/transforms.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
/// Filter
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/tonic_gen/errors.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct TaskGraphValueError {
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/tonic_gen/expression.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
/// ESTree-style AST nodes
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/tonic_gen/pretransform.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
/// / Pre transform spec messages
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/tonic_gen/services.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct QueryRequest {
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/tonic_gen/tasks.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
/// ## Task Value
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
1 change: 1 addition & 0 deletions vegafusion-core/src/proto/tonic_gen/transforms.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
/// Filter
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
4 changes: 3 additions & 1 deletion vegafusion-dataframe/Cargo.toml
Original file line number Diff line number Diff line change
@@ -24,9 +24,11 @@ workspace = true
[dependencies.datafusion-expr]
workspace = true

[dependencies.datafusion-functions-window]
workspace = true

[dependencies.arrow]
workspace = true
default-features = false

[dependencies.pyo3]
workspace = true
32 changes: 15 additions & 17 deletions vegafusion-dataframe/src/dataframe.rs
Original file line number Diff line number Diff line change
@@ -4,8 +4,9 @@ use arrow::datatypes::{Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use async_trait::async_trait;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::{expr, BuiltInWindowFunction, Expr, WindowFrame, WindowFunctionDefinition};
use sqlparser::ast::NullTreatment;
use datafusion_expr::expr::WildcardOptions;
use datafusion_expr::{Expr, SortExpr};
use datafusion_functions_window::row_number::row_number;
use std::any::Any;
use std::fmt::{Display, Formatter};
use std::sync::Arc;
@@ -38,7 +39,7 @@ pub trait DataFrame: Send + Sync + 'static {
.with_context(|| String::from("Failed to concatenate RecordBatches"))
}

async fn sort(&self, _exprs: Vec<Expr>, _limit: Option<i32>) -> Result<Arc<dyn DataFrame>> {
async fn sort(&self, _exprs: Vec<SortExpr>, _limit: Option<i32>) -> Result<Arc<dyn DataFrame>> {
Err(VegaFusionError::sql_not_supported("sort not supported"))
}

@@ -87,7 +88,7 @@ pub trait DataFrame: Send + Sync + 'static {
async fn stack(
&self,
_field: &str,
_orderby: Vec<Expr>,
_orderby: Vec<SortExpr>,
_groupby: &[String],
_start_field: &str,
_stop_field: &str,
@@ -110,21 +111,18 @@ pub trait DataFrame: Send + Sync + 'static {
async fn with_index(&self, index_name: &str) -> Result<Arc<dyn DataFrame>> {
if self.schema().column_with_name(index_name).is_some() {
// Column is already present, don't overwrite
self.select(vec![Expr::Wildcard { qualifier: None }]).await
self.select(vec![Expr::Wildcard {
qualifier: None,
options: WildcardOptions::default(),
}])
.await
} else {
let selections = vec![
Expr::WindowFunction(expr::WindowFunction {
fun: WindowFunctionDefinition::BuiltInWindowFunction(
BuiltInWindowFunction::RowNumber,
),
args: vec![],
partition_by: vec![],
order_by: vec![],
window_frame: WindowFrame::new(Some(true)),
null_treatment: Some(NullTreatment::IgnoreNulls),
})
.alias(index_name),
Expr::Wildcard { qualifier: None },
row_number().alias(index_name),
Expr::Wildcard {
qualifier: None,
options: WildcardOptions::default(),
},
];
self.select(selections).await
}
4 changes: 2 additions & 2 deletions vegafusion-datafusion-udfs/src/udafs/mod.rs
Original file line number Diff line number Diff line change
@@ -147,7 +147,7 @@ lazy_static! {
Volatility::Immutable,
// Accumulator factory
Arc::new(|accum_args| Ok(Box::new(PercentileContAccumulator {
data_type: accum_args.data_type.clone(),
data_type: accum_args.return_type.clone(),
all_values: Default::default(),
percentile: 0.25,
}))),
@@ -166,7 +166,7 @@ lazy_static! {
Volatility::Immutable,
// Accumulator factory
Arc::new(|accum_args| Ok(Box::new(PercentileContAccumulator {
data_type: accum_args.data_type.clone(),
data_type: accum_args.return_type.clone(),
all_values: Default::default(),
percentile: 0.75,
}))),
8 changes: 4 additions & 4 deletions vegafusion-python/src/connection.rs
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ fn perform_fetch_query(query: &str, schema: &Schema, conn: &PyObject) -> Result<
#[pyclass]
#[derive(Clone)]
pub struct PySqlConnection {
conn: PyObject,
conn: Arc<PyObject>,
dialect: Dialect,
fallback_conn: Option<Arc<dyn SqlConnection>>,
}
@@ -75,7 +75,7 @@ impl PySqlConnection {
let (dialect, fallback_conn) = get_dialect_and_fallback_connection(&conn)?;

Ok(Self {
conn,
conn: Arc::new(conn),
dialect,
fallback_conn,
})
@@ -365,7 +365,7 @@ impl SqlConnection for PySqlConnection {
#[pyclass]
#[derive(Clone)]
pub struct PySqlDataset {
pub dataset: PyObject,
pub dataset: Arc<PyObject>,
pub dialect: Dialect,
pub table_name: String,
pub table_schema: Schema,
@@ -387,7 +387,7 @@ impl PySqlDataset {
})?;

Ok(Self {
dataset,
dataset: Arc::new(dataset),
dialect,
table_name,
table_schema,
Loading