Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upgrade datafusion and arrow-rs #1390

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions integration/duckdb_lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ lance = { path = "../../rust/lance" }
duckdb-ext = { path = "./duckdb-ext" }
lazy_static = "1.4.0"
tokio = { version = "1.23", features = ["rt-multi-thread"] }
arrow-schema = "43.0.0"
arrow-array = "43.0.0"
arrow-schema = "46.0.0"
arrow-array = "46.0.0"
futures = "0.3"
num-traits = "0.2"

Expand Down
8 changes: 4 additions & 4 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ name = "lance"
crate-type = ["cdylib"]

[dependencies]
arrow = { version = "43.0.0", features = ["pyarrow"] }
arrow-array = "43.0"
arrow-data = "43.0"
arrow-schema = "43.0"
arrow = { version = "46.0.0", features = ["pyarrow"] }
arrow-array = "46.0"
arrow-data = "46.0"
arrow-schema = "46.0"
async-trait = "0.1"
chrono = "0.4.31"
env_logger = "0.10"
Expand Down
25 changes: 13 additions & 12 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,24 @@ lance-linalg = { version = "=0.8.3", path = "./lance-linalg" }
lance-testing = { version = "=0.8.3", path = "./lance-testing" }
lance-test-macros = { version = "=0.8.3", path = "./lance-test-macros" }
# Note that this one does not include pyarrow
arrow = { version = "43.0.0", optional = false }
arrow-arith = "43.0"
arrow-array = "43.0"
arrow-buffer = "43.0"
arrow-cast = "43.0"
arrow-data = "43.0"
arrow-ord = "43.0"
arrow-row = "43.0"
arrow-schema = "43.0"
arrow-select = "43.0"
arrow = { version = "46.0.0", optional = false }
arrow-arith = "46.0"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, we are wait for arrow 47 for a few fixes? And datafusion 32 is being voted at the moment, right?

Copy link
Contributor Author

@wjones127 wjones127 Oct 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wait? Is something broken in 46?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to bump datafusion again in 3-5 days to include fixes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 yes, we need to upgrade for the next version. IMO we should upgrade on every datafusion release.

arrow-array = "46.0"
arrow-buffer = "46.0"
arrow-cast = "46.0"
arrow-data = "46.0"
arrow-ipc = { version = "46.0", features = ["zstd"] }
arrow-ord = "46.0"
arrow-row = "46.0"
arrow-schema = "46.0"
arrow-select = "46.0"
half = { "version" = "=2.2.1", default-features = false, features = [
"num-traits",
] }
approx = "0.5.1"
criterion = { version = "0.5", features = ["async", "async_tokio"] }
datafusion-common = "28.0"
datafusion-sql = "28.0"
datafusion-common = "31.0"
datafusion-sql = "31.0"
either = "1.0"
futures = "0.3"
log = "0.4"
Expand Down
7 changes: 4 additions & 3 deletions rust/lance-linalg/benches/dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

use std::iter::Sum;

use arrow_arith::{aggregate::sum, arithmetic::multiply};
use arrow_arith::{aggregate::sum, numeric::mul};
use arrow_array::{
cast::AsArray,
types::{Float16Type, Float32Type, Float64Type},
ArrowNumericType, Float16Array, Float32Array, NativeAdapter, PrimitiveArray,
};
Expand All @@ -30,8 +31,8 @@ use lance_testing::datagen::generate_random_array_with_seed;

#[inline]
fn dot_arrow_artiy<T: ArrowNumericType>(x: &PrimitiveArray<T>, y: &PrimitiveArray<T>) -> T::Native {
let m = multiply(x, y).unwrap();
sum(&m).unwrap()
let m = mul(x, y).unwrap();
sum(m.as_primitive::<T>()).unwrap()
}

fn run_bench<T: ArrowNumericType>(c: &mut Criterion)
Expand Down
14 changes: 9 additions & 5 deletions rust/lance-linalg/benches/l2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@

use arrow_arith::{
aggregate::sum,
arithmetic::{multiply, subtract},
arity::binary,
numeric::{mul, sub},
};
use arrow_array::{
cast::{as_primitive_array, AsArray},
types::Float32Type,
Float32Array,
};
use arrow_array::{cast::as_primitive_array, Float32Array};
use criterion::{criterion_group, criterion_main, Criterion};

#[cfg(target_os = "linux")]
Expand All @@ -28,9 +32,9 @@ use lance_testing::datagen::generate_random_array_with_seed;

#[inline]
fn l2_arrow(x: &Float32Array, y: &Float32Array) -> f32 {
let s = subtract(x, y).unwrap();
let m = multiply(&s, &s).unwrap();
sum(&m).unwrap()
let s = sub(x, y).unwrap();
let m = mul(&s, &s).unwrap();
sum(m.as_primitive::<Float32Type>()).unwrap()
}

#[inline]
Expand Down
6 changes: 3 additions & 3 deletions rust/lance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ arrow-array = { workspace = true }
arrow-buffer = { workspace = true }
arrow-cast = { workspace = true }
arrow-data = { workspace = true }
arrow-ipc = { version = "43.0", features = ["zstd"] }
arrow-ipc = { workspace = true }
arrow-ord = { workspace = true }
arrow-row = { workspace = true }
arrow-schema = { workspace = true }
Expand Down Expand Up @@ -56,10 +56,10 @@ rand = { version = "0.8.3", features = ["small_rng"] }
futures = { workspace = true }
uuid = { version = "1.2", features = ["v4"] }
shellexpand = "3.0.0"
arrow = { version = "43.0.0", features = ["prettyprint"] }
arrow = { version = "46.0.0", features = ["prettyprint"] }
num_cpus = "1.0"
# TODO: use datafusion sub-modules to reduce build size?
datafusion = { version = "28.0.0", default-features = false, features = [
datafusion = { version = "31.0.0", default-features = false, features = [
"regex_expressions",
] }
lapack = { version = "0.19.0", optional = true }
Expand Down
6 changes: 3 additions & 3 deletions rust/lance/src/dataset/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,8 +661,8 @@ impl FragmentReader {
#[cfg(test)]
mod tests {

use arrow_arith::arithmetic::multiply_scalar;
use arrow_array::{cast::AsArray, ArrayRef, Int32Array, RecordBatchIterator, StringArray};
use arrow_arith::numeric::mul;
use arrow_array::{ArrayRef, Int32Array, RecordBatchIterator, StringArray};
use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
use arrow_select::concat::concat_batches;
use futures::TryStreamExt;
Expand Down Expand Up @@ -949,7 +949,7 @@ mod tests {
)]));
while let Some(batch) = updater.next().await.unwrap() {
let input_col = batch.column_by_name("i").unwrap();
let result_col: Int32Array = multiply_scalar(input_col.as_primitive(), 2).unwrap();
let result_col = mul(input_col, &Int32Array::new_scalar(2)).unwrap();
let batch = RecordBatch::try_new(
new_schema.clone(),
vec![Arc::new(result_col) as ArrayRef],
Expand Down
19 changes: 12 additions & 7 deletions rust/lance/src/encodings/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ use std::ops::{Range, RangeFrom, RangeFull, RangeTo};
use std::ptr::NonNull;
use std::sync::Arc;

use arrow_arith::arithmetic::subtract_scalar;
use arrow_arith::numeric::sub;
use arrow_array::builder::{ArrayBuilder, PrimitiveBuilder};
use arrow_array::cast::AsArray;
use arrow_array::{
cast::as_primitive_array,
new_empty_array,
types::{BinaryType, ByteArrayType, Int64Type, LargeBinaryType, LargeUtf8Type, Utf8Type},
types::{
BinaryType, ByteArrayType, Int64Type, LargeBinaryType, LargeUtf8Type, UInt32Type, Utf8Type,
},
Array, ArrayRef, GenericByteArray, Int64Array, OffsetSizeTrait, UInt32Array,
};
use arrow_buffer::{bit_util, ArrowNativeType, Buffer, MutableBuffer, ScalarBuffer};
Expand Down Expand Up @@ -214,12 +216,14 @@ impl<'a, T: ByteArrayType> BinaryDecoder<'a, T> {
let start = positions.value(range.start);
let end = positions.value(range.end);

let start_scalar = Int64Array::new_scalar(start);

let slice = positions.slice(range.start, range.len() + 1);
let offset_data = if T::Offset::IS_LARGE {
subtract_scalar(&slice, start)?.into_data()
sub(&slice, &start_scalar)?.into_data()
} else {
cast(
&(Arc::new(subtract_scalar(&slice, start)?) as ArrayRef),
&(Arc::new(sub(&slice, &start_scalar)?) as ArrayRef),
&DataType::Int32,
)?
.into_data()
Expand Down Expand Up @@ -281,18 +285,19 @@ fn plan_take_chunks(
min_io_size: i64,
) -> Result<Vec<TakeChunksPlan>> {
let start = indices.value(0);
let indices = subtract_scalar(indices, start)?;
let indices = sub(indices, &UInt32Array::new_scalar(start))?;
let indices_ref = indices.as_primitive::<UInt32Type>();

let mut chunks: Vec<TakeChunksPlan> = vec![];
let mut start_idx = 0;
let mut last_idx: i64 = -1;
let mut is_contiguous = true;
for i in 0..indices.len() {
let current = indices.value(i) as usize;
let current = indices_ref.value(i) as usize;
let curr_contiguous = current == start_idx || current as i64 - last_idx == 1;

if !curr_contiguous
&& positions.value(current) - positions.value(indices.value(start_idx) as usize)
&& positions.value(current) - positions.value(indices_ref.value(start_idx) as usize)
> min_io_size
{
chunks.push(TakeChunksPlan {
Expand Down
6 changes: 3 additions & 3 deletions rust/lance/src/encodings/plain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::slice::from_raw_parts;
use std::sync::Arc;

use arrow::array::{as_boolean_array, BooleanBuilder};
use arrow_arith::arithmetic::subtract_scalar;
use arrow_arith::numeric::sub;
use arrow_array::cast::as_primitive_array;
use arrow_array::{
make_array, new_empty_array, Array, ArrayRef, BooleanArray, FixedSizeBinaryArray,
Expand Down Expand Up @@ -342,7 +342,7 @@ impl<'a> PlainDecoder<'a> {
let end = request.value(request.len() - 1);
let array = self.get(start as usize..end as usize + 1).await?;

let shifted_indices = subtract_scalar(request, start)?;
let shifted_indices = sub(request, &UInt32Array::new_scalar(start))?;
Ok::<ArrayRef, Error>(take(&array, &shifted_indices, None)?)
})
.buffered(num_cpus::get())
Expand Down Expand Up @@ -405,7 +405,7 @@ impl<'a> Decoder for PlainDecoder<'a> {
let start = request.value(0);
let end = request.value(request.len() - 1);
let array = self.get(start as usize..end as usize + 1).await?;
let adjusted_offsets = subtract_scalar(request, start)?;
let adjusted_offsets = sub(request, &UInt32Array::new_scalar(start))?;
Ok::<ArrayRef, Error>(take(&array, &adjusted_offsets, None)?)
})
.buffered(num_cpus::get() * PARALLELISM_FACTOR)
Expand Down
4 changes: 2 additions & 2 deletions rust/lance/src/index/vector/ivf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use std::{any::Any, sync::Arc};

use arrow_arith::arithmetic::subtract_dyn;
use arrow_arith::numeric::sub;
use arrow_array::{
cast::{as_primitive_array, as_struct_array, AsArray},
types::Float32Type,
Expand Down Expand Up @@ -125,7 +125,7 @@ impl IVFIndex {
};

let partition_centroids = self.ivf.centroids.value(partition_id);
let residual_key = subtract_dyn(query.key.as_ref(), &partition_centroids)?;
let residual_key = sub(query.key.as_ref(), &partition_centroids)?;
// Query in partition.
let mut part_query = query.clone();
part_query.key = as_primitive_array(&residual_key).clone().into();
Expand Down
19 changes: 10 additions & 9 deletions rust/lance/src/io/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::borrow::Cow;
use std::ops::{Range, RangeTo};
use std::sync::Arc;

use arrow_arith::arithmetic::subtract_scalar;
use arrow_arith::numeric::sub;
use arrow_array::{
builder::PrimitiveBuilder,
cast::AsArray,
Expand Down Expand Up @@ -954,8 +954,9 @@ where
}
};

let start_position = positions.value(0);
let offset_arr = subtract_scalar(positions, start_position)?;
let start_position = PrimitiveArray::<T>::new_scalar(positions.value(0));
let offset_arr = sub(positions, &start_position)?;
let offset_arr_ref = offset_arr.as_primitive::<T>();
let value_arrs = read_array(
reader,
&field.children[0],
Expand All @@ -964,7 +965,7 @@ where
&value_params,
)
.await?;
let arr = try_new_generic_list_array(value_arrs, &offset_arr)?;
let arr = try_new_generic_list_array(value_arrs, offset_arr_ref)?;
Ok(Arc::new(arr) as ArrayRef)
}

Expand Down Expand Up @@ -1528,22 +1529,22 @@ mod tests {

let arr = read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFull).await;
assert_eq!(100, arr.len());
assert_eq!(100, arr.null_count());
assert_eq!(arr.data_type(), &DataType::Null);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

null_count() no longer returns the logical null count, just the number of "physical nulls". apache/arrow-rs#4691


let arr =
read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::Range(10..25)).await;
assert_eq!(15, arr.len());
assert_eq!(15, arr.null_count());
assert_eq!(arr.data_type(), &DataType::Null);

let arr =
read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeFrom(60..)).await;
assert_eq!(40, arr.len());
assert_eq!(40, arr.null_count());
assert_eq!(arr.data_type(), &DataType::Null);

let arr =
read_array_w_params(&reader, &schema.fields[1], ReadBatchParams::RangeTo(..25)).await;
assert_eq!(25, arr.len());
assert_eq!(25, arr.null_count());
assert_eq!(arr.data_type(), &DataType::Null);

let arr = read_array_w_params(
&reader,
Expand All @@ -1552,7 +1553,7 @@ mod tests {
)
.await;
assert_eq!(4, arr.len());
assert_eq!(4, arr.null_count());
assert_eq!(arr.data_type(), &DataType::Null);

// raise error if take indices are out of bounds
let params = ReadBatchParams::Indices(UInt32Array::from(vec![1, 9, 30, 72, 100]));
Expand Down
1 change: 1 addition & 0 deletions rust/lance/src/utils/tfrecord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
//! [read_tfrecord] to read the file into an Arrow record batch stream.

use arrow::buffer::OffsetBuffer;
use arrow::datatypes::ArrowPrimitiveType;
use arrow_array::builder::PrimitiveBuilder;
use arrow_array::{ArrayRef, FixedSizeListArray, ListArray};
use arrow_buffer::ScalarBuffer;
Expand Down