Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to DataFusion 33 #424

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 121 additions & 116 deletions Cargo.lock

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,23 @@ members = [
]

[workspace.dependencies]
arrow = { version = "47.0.0", default_features = false }
sqlparser = { version = "0.37.0" }
arrow = { version = "48.0.1", default_features = false }
sqlparser = { version = "0.39.0" }
chrono = { version = "0.4.31", default_features = false }
reqwest = { version = "0.11.22", default-features = false }
tokio = { version = "1.32.0" }
pyo3 = { version = "0.19" }
pyo3 = { version = "0.20.0" }
pythonize = { version = "0.20.0" }
prost = { version = "0.12.1" }
prost-types = { version = "0.12.1" }
object_store = { version="0.7" }
object_store = { version= "0.7.1" }

datafusion = { version = "32.0.0" }
datafusion-common = { version = "32.0.0", default_features = false}
datafusion-expr = { version = "32.0.0" }
datafusion-proto = { version = "32.0.0" }
datafusion-physical-expr = { version = "32.0.0" }
datafusion-optimizer = { version = "32.0.0" }
datafusion = { version = "33.0.0" }
datafusion-common = { version = "33.0.0", default_features = false}
datafusion-expr = { version = "33.0.0" }
datafusion-proto = { version = "33.0.0" }
datafusion-physical-expr = { version = "33.0.0" }
datafusion-optimizer = { version = "33.0.0" }

[profile.release]
## Tell `rustc` to use highest performance optimization and perform Link Time Optimization
Expand Down
79 changes: 67 additions & 12 deletions vegafusion-common/src/data/scalar.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use crate::error::{Result, VegaFusionError};
use arrow::array::{new_empty_array, Array, ArrayRef, ListArray};
use datafusion_common::DataFusionError;

use datafusion_common::utils::array_into_list_array;
pub use datafusion_common::ScalarValue;

#[cfg(feature = "json")]
use {
arrow::datatypes::{DataType, Field},
arrow::datatypes::DataType,
serde_json::{Map, Value},
std::ops::Deref,
std::sync::Arc,
Expand Down Expand Up @@ -62,18 +65,18 @@ impl ScalarValueHelpers for ScalarValue {
}
}
Value::Array(elements) => {
let (elements, dtype) = if elements.is_empty() {
(Vec::new(), DataType::Float64)
let array: ListArray = if elements.is_empty() {
array_into_list_array(Arc::new(new_empty_array(&DataType::Float64)))
} else {
let elements: Vec<_> = elements
.iter()
.map(ScalarValue::from_json)
.collect::<Result<Vec<ScalarValue>>>()?;
let dtype = elements[0].data_type();
(elements, dtype)

array_into_list_array(ScalarValue::iter_to_array(elements)?)
};

ScalarValue::List(Some(elements), Arc::new(Field::new("item", dtype, true)))
ScalarValue::List(Arc::new(array))
}
};
Ok(scalar_value)
Expand Down Expand Up @@ -132,13 +135,12 @@ impl ScalarValueHelpers for ScalarValue {
ScalarValue::IntervalDayTime(Some(_v)) => {
unimplemented!()
}
ScalarValue::List(Some(v), _) => Value::Array(
v.clone()
.into_iter()
.map(|v| v.to_json())
ScalarValue::List(a) => Value::Array(
a.list_el_to_scalar_vec()?
.iter()
.map(|s| s.to_json())
.collect::<Result<Vec<_>>>()?,
),
ScalarValue::List(None, _) => Value::Array(Vec::new()),
ScalarValue::Struct(Some(v), fields) => {
let mut pairs: Map<String, Value> = Default::default();
for (val, field) in v.iter().zip(fields.deref()) {
Expand Down Expand Up @@ -173,7 +175,8 @@ impl ScalarValueHelpers for ScalarValue {
}

fn to_f64x2(&self) -> Result<[f64; 2]> {
if let ScalarValue::List(Some(elements), _) = self {
if let ScalarValue::List(array) = self {
let elements = array.list_el_to_scalar_vec()?;
if let [v0, v1] = elements.as_slice() {
return Ok([v0.to_f64()?, v1.to_f64()?]);
}
Expand All @@ -195,3 +198,55 @@ impl ScalarValueHelpers for ScalarValue {
})
}
}

pub trait ArrayRefHelpers {
fn to_scalar_vec(&self) -> std::result::Result<Vec<ScalarValue>, DataFusionError>;

fn list_el_to_scalar_vec(&self) -> std::result::Result<Vec<ScalarValue>, DataFusionError>;

fn list_el_len(&self) -> std::result::Result<usize, DataFusionError>;

fn list_el_dtype(&self) -> std::result::Result<DataType, DataFusionError>;
}

impl ArrayRefHelpers for ArrayRef {
/// Convert ArrayRef into vector of ScalarValues
fn to_scalar_vec(&self) -> std::result::Result<Vec<ScalarValue>, DataFusionError> {
(0..self.len())
.map(|i| Ok(ScalarValue::try_from_array(self, i)?))
.collect::<std::result::Result<Vec<_>, DataFusionError>>()
}

/// Extract Vec<ScalarValue> for single element ListArray (as is stored inside ScalarValue::List(arr))
fn list_el_to_scalar_vec(&self) -> std::result::Result<Vec<ScalarValue>, DataFusionError> {
let a = self
.as_any()
.downcast_ref::<ListArray>()
.ok_or(DataFusionError::Internal(
"list_el_to_scalar_vec called on non-List type".to_string(),
))?;
a.value(0).to_scalar_vec()
}

/// Extract length of single element ListArray
fn list_el_len(&self) -> std::result::Result<usize, DataFusionError> {
let a = self
.as_any()
.downcast_ref::<ListArray>()
.ok_or(DataFusionError::Internal(
"list_el_len called on non-List type".to_string(),
))?;
Ok(a.value(0).len())
}

/// Extract data type of single element ListArray
fn list_el_dtype(&self) -> std::result::Result<DataType, DataFusionError> {
let a = self
.as_any()
.downcast_ref::<ListArray>()
.ok_or(DataFusionError::Internal(
"list_el_len called on non-List type".to_string(),
))?;
Ok(a.value(0).data_type().clone())
}
}
29 changes: 6 additions & 23 deletions vegafusion-common/src/data/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
error::{Result, ResultWithContext, VegaFusionError},
};

use arrow::array::new_empty_array;
#[cfg(feature = "prettyprint")]
use arrow::util::pretty::pretty_format_batches;
use std::{
Expand Down Expand Up @@ -41,6 +42,7 @@

#[cfg(feature = "base64")]
use base64::{engine::general_purpose, Engine as _};
use datafusion_common::utils::array_into_list_array;

#[derive(Clone, Debug)]
pub struct VegaFusionTable {
Expand Down Expand Up @@ -182,30 +184,11 @@
pub fn to_scalar_value(&self) -> Result<ScalarValue> {
if self.num_rows() == 0 {
// Return empty list with (arbitrary) Float64 type
let dtype = DataType::Float64;
return Ok(ScalarValue::List(
Some(Vec::new()),
Arc::new(Field::new("item", dtype, true)),
));
let array = Arc::new(new_empty_array(&DataType::Float64));
return Ok(ScalarValue::List(Arc::new(array_into_list_array(array))));
}

let mut elements: Vec<ScalarValue> = Vec::new();
for batch in &self.batches {
let array = Arc::new(StructArray::from(batch.clone())) as ArrayRef;

for i in 0..array.len() {
let scalar = ScalarValue::try_from_array(&array, i).with_context(|| {
"Failed to convert record batch row to ScalarValue".to_string()
})?;
elements.push(scalar)
}
}

let dtype = elements[0].data_type();
Ok(ScalarValue::List(
Some(elements),
Arc::new(Field::new("item", dtype, true)),
))
let array = Arc::new(StructArray::from(self.to_record_batch()?)) as ArrayRef;
Ok(ScalarValue::List(Arc::new(array_into_list_array(array))))
}

#[cfg(feature = "json")]
Expand Down Expand Up @@ -252,7 +235,7 @@
ScalarValue::try_from(&DataType::Float64).unwrap(),
)]);
let array = empty_scalar.to_array_of_size(values.len());
let struct_array = array.as_any().downcast_ref::<StructArray>().unwrap();

Check failure on line 238 in vegafusion-common/src/data/table.rs

View workflow job for this annotation

GitHub Actions / build-vegafusion-server-linux-arm64

no method named `as_any` found for enum `std::result::Result` in the current scope
let record_batch = RecordBatch::from(struct_array);
Self::try_new(record_batch.schema(), vec![record_batch])
}
Expand Down
12 changes: 1 addition & 11 deletions vegafusion-core/src/task_graph/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,7 @@ pub fn inner_size_of_scalar(value: &ScalarValue) -> usize {
ScalarValue::LargeUtf8(Some(s)) => size_of_val(s.as_bytes()) + size_of::<String>(),
ScalarValue::Binary(Some(b)) => size_of_val(b.as_slice()) + size_of::<Vec<u8>>(),
ScalarValue::LargeBinary(Some(b)) => size_of_val(b.as_slice()) + size_of::<Vec<u8>>(),
ScalarValue::List(Some(values), field) => {
let values_bytes: usize = size_of::<Vec<ScalarValue>>()
+ values
.iter()
.map(|v| size_of::<ScalarValue>() + inner_size_of_scalar(v))
.sum::<usize>();

let dtype_bytes = size_of::<DataType>() + inner_size_of_dtype(field.data_type());

values_bytes + dtype_bytes
}
ScalarValue::List(array) => size_of::<Vec<ScalarValue>>() + size_of_array_ref(array),
ScalarValue::Struct(Some(values), fields) => {
let values_bytes: usize = size_of::<Vec<ScalarValue>>()
+ values
Expand Down
2 changes: 1 addition & 1 deletion vegafusion-core/src/task_graph/task_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl TryFrom<&TaskValue> for ProtoTaskValue {
fn try_from(value: &TaskValue) -> std::result::Result<Self, Self::Error> {
match value {
TaskValue::Scalar(scalar) => {
let scalar_array = scalar.to_array();
let scalar_array = scalar.to_array()?;
let scalar_rb = RecordBatch::try_from_iter(vec![("value", scalar_array)])?;
let ipc_bytes = VegaFusionTable::from(scalar_rb).to_ipc_bytes()?;
Ok(Self {
Expand Down
4 changes: 2 additions & 2 deletions vegafusion-dataframe/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub trait DataFrame: Send + Sync + 'static {
async fn with_index(&self, index_name: &str) -> Result<Arc<dyn DataFrame>> {
if self.schema().column_with_name(index_name).is_some() {
// Column is already present, don't overwrite
self.select(vec![Expr::Wildcard]).await
self.select(vec![Expr::Wildcard { qualifier: None }]).await
} else {
let selections = vec![
Expr::WindowFunction(expr::WindowFunction {
Expand All @@ -129,7 +129,7 @@ pub trait DataFrame: Send + Sync + 'static {
},
})
.alias(index_name),
Expr::Wildcard,
Expr::Wildcard { qualifier: None },
];
self.select(selections).await
}
Expand Down
11 changes: 5 additions & 6 deletions vegafusion-datafusion-udfs/src/udafs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::sync::Arc;
use vegafusion_common::arrow::array::{Array, ArrayRef, UInt32Array};
use vegafusion_common::arrow::compute::sort_to_indices;
use vegafusion_common::arrow::datatypes::{DataType, Field, FieldRef};
use vegafusion_common::data::scalar::ScalarValueHelpers;
use vegafusion_common::data::scalar::{ArrayRefHelpers, ScalarValueHelpers};
use vegafusion_common::datafusion_common::{DataFusionError, ScalarValue};
use vegafusion_common::datafusion_expr::{create_udaf, Accumulator, AggregateUDF, Volatility};

Expand All @@ -19,8 +19,8 @@ pub(crate) struct PercentileContAccumulator {

impl Accumulator for PercentileContAccumulator {
fn state(&self) -> Result<Vec<ScalarValue>, DataFusionError> {
let state = ScalarValue::new_list(Some(self.all_values.clone()), self.data_type.clone());
Ok(vec![state])
let state = ScalarValue::new_list(self.all_values.as_slice(), &self.data_type);
Ok(vec![ScalarValue::List(Arc::new(state))])
}

fn update_batch(&mut self, values: &[ArrayRef]) -> Result<(), DataFusionError> {
Expand All @@ -43,14 +43,13 @@ impl Accumulator for PercentileContAccumulator {
assert!(matches!(array.data_type(), DataType::List(_)));
for index in 0..array.len() {
match ScalarValue::try_from_array(array, index)? {
ScalarValue::List(Some(values), _) => {
for scalar in values {
ScalarValue::List(array) => {
for scalar in array.list_el_to_scalar_vec()? {
if !scalar_is_non_finite(&scalar) {
self.all_values.push(scalar);
}
}
}
ScalarValue::List(None, _) => {} // skip empty state
v => {
return Err(DataFusionError::Internal(format!(
"unexpected state in percentile_cont. Expected DataType::List, got {v:?}"
Expand Down
7 changes: 4 additions & 3 deletions vegafusion-datafusion-udfs/src/udfs/array/indexof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use vegafusion_common::arrow::array::{
};
use vegafusion_common::arrow::compute::cast;
use vegafusion_common::arrow::datatypes::DataType;
use vegafusion_common::data::scalar::ScalarValueHelpers;
use vegafusion_common::data::scalar::{ArrayRefHelpers, ScalarValueHelpers};
use vegafusion_common::datafusion_common::{DataFusionError, ScalarValue};
use vegafusion_common::datafusion_expr::{
ColumnarValue, ReturnTypeFunction, ScalarFunctionImplementation, ScalarUDF, Signature,
Expand All @@ -24,8 +24,9 @@ fn make_indexof_udf() -> ScalarUDF {
let indexof_fn: ScalarFunctionImplementation = Arc::new(|args: &[ColumnarValue]| {
// Signature ensures there is a single argument
let (array, array_dtype) = match &args[0] {
ColumnarValue::Scalar(ScalarValue::List(Some(scalar_array), field)) => {
(scalar_array.clone(), field.data_type().clone())
ColumnarValue::Scalar(ScalarValue::List(array)) => {
let scalar_array = array.list_el_to_scalar_vec()?;
(scalar_array, array.list_el_dtype()?)
}
_ => {
return Err(DataFusionError::Internal(
Expand Down
5 changes: 3 additions & 2 deletions vegafusion-datafusion-udfs/src/udfs/array/length.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ fn make_length_udf() -> ScalarUDF {
Ok(match arg {
ColumnarValue::Scalar(value) => {
match value {
ScalarValue::List(Some(arr), _) => {
ColumnarValue::Scalar(ScalarValue::from(arr.len() as f64))
ScalarValue::List(arr) => {
let arr = arr.as_any().downcast_ref::<ListArray>().unwrap();
ColumnarValue::Scalar(ScalarValue::from(arr.value(0).len() as f64))
}
ScalarValue::Utf8(Some(s)) | ScalarValue::LargeUtf8(Some(s)) => {
ColumnarValue::Scalar(ScalarValue::from(s.len() as f64))
Expand Down
21 changes: 16 additions & 5 deletions vegafusion-datafusion-udfs/src/udfs/array/span.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::sync::Arc;
use vegafusion_common::arrow::array::{Array, ListArray};
use vegafusion_common::arrow::datatypes::{DataType, Field};
use vegafusion_common::data::scalar::ScalarValueHelpers;
use vegafusion_common::datafusion_common::{DataFusionError, ScalarValue};
Expand All @@ -25,21 +26,31 @@ fn make_span_udf() -> ScalarUDF {
// Span of scalar (including null) is 0
ColumnarValue::Scalar(ScalarValue::from(0.0))
}
ScalarValue::List(Some(arr), element_type) => {
match element_type.data_type() {
ScalarValue::List(arr) => {
// Unwrap single element ListArray
let arr = arr.as_any().downcast_ref::<ListArray>().unwrap();
let arr = arr.value(0);
match arr.data_type() {
DataType::Float64 => {
if arr.is_empty() {
// Span of empty array is 0
ColumnarValue::Scalar(ScalarValue::from(0.0))
} else {
let first = arr.first().unwrap().to_f64().unwrap();
let last = arr.last().unwrap().to_f64().unwrap();
let first = ScalarValue::try_from_array(&arr, 0)
.unwrap()
.to_f64()
.unwrap();
let last = ScalarValue::try_from_array(&arr, arr.len() - 1)
.unwrap()
.to_f64()
.unwrap();
ColumnarValue::Scalar(ScalarValue::from(last - first))
}
}
_ => {
return Err(DataFusionError::Internal(format!(
"Unexpected element type for span function: {element_type}"
"Unexpected element type for span function: {}",
arr.data_type()
)))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn make_date_part_tz_udf() -> ScalarUDF {
// [1] data array
let timestamp_array = match &args[1] {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array(),
ColumnarValue::Scalar(scalar) => scalar.to_array()?,
};

let timestamp_array = to_timestamp_ms(&timestamp_array)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fn make_date_to_utc_timestamp() -> ScalarUDF {
// [0] data array
let date_array = match &args[0] {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array(),
ColumnarValue::Scalar(scalar) => scalar.to_array()?,
};

// [1] timezone string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ fn make_epoch_to_utc_timestamp() -> ScalarUDF {
// [0] data array
let timestamp_array = match &args[0] {
ColumnarValue::Array(array) => array.clone(),
ColumnarValue::Scalar(scalar) => scalar.to_array(),
ColumnarValue::Scalar(scalar) => scalar.to_array()?,
};

let timestamp_array = cast(
Expand Down
Loading
Loading