-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update ObjectStore 0.7.0 and Arrow 46.0.0 #7282
Conversation
@@ -19,8 +19,6 @@ | |||
|
|||
mod arrow_file; | |||
mod avro; | |||
#[cfg(test)] | |||
mod chunked_store; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was moved upstream
for w in writers.iter_mut() { | ||
|
||
// Must drop the stream before creating ObjectMeta below as drop | ||
// triggers finish for ZstdEncoder which writes additional data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We previously got away with this as the requested range was being ignored by LocalFileSystem
84655f0
to
ac3c787
Compare
ac3c787
to
80e73d7
Compare
let is_whole_file_scanned = file_meta.range.is_none(); | ||
let decoder = if is_whole_file_scanned { | ||
// For special case: `get_range()` will interpret `start` and `end` as the | ||
// byte range after decompression for compressed files | ||
// Don't seek if no range as breaks FIFO files |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is at best a hack, I'm not really sure how to coherently support FIFO files, I do wonder if this support really belongs in DataFusion proper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @metesynnada -- I wonder if you have thoughts about moving FIFO support into a more separated boundary -- I wonder if we could make a special interface that handles incremental streaming somehow, and then implement FIFO support for that interface 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I apologize for seeing your comment late. I will bring up the topic with @ozankabak for further discussion. Additionally, I wanted to inquire if you recommend using a so-called streaming_store
rather than an object_store
to accommodate streaming use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the question is perhaps more whether the streaming operators should be operators in their own right, instead of both streaming and non-streaming use-cases using CsvExec. Perhaps we could introduce a FileStreamExec or something? Both could still make use of object_store and arrow-csv under the hood, but separating them would perhaps better accommodate divergent functionality like schema inference, parallel reads, late materialisation, etc... that doesn't work in the same way for streams?
I dunno, just spitballing here, it seems unfortunate to force a lowest common denominator on CsvExec, where it can't read byte ranges from files...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will think about the FileStreamExec
idea and discuss with @metesynnada. We might be coming to a point where taking such a step may make sense. We will circle back once we have some clarity on our end.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ozankabak and I agree that implementing FileStreamExec
would be a logical choice. We plan on developing a proof of concept for it next week and sharing a design document.
assert_eq!(c1.value(0), "1"); | ||
assert_eq!(c1.value(1), "0"); | ||
assert_eq!(c1.value(2), "1"); | ||
assert_eq!(c1.value(0), "true"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -201,6 +201,7 @@ pub fn cell_to_string(col: &ArrayRef, row: usize) -> Result<String> { | |||
Ok(NULL_STR.to_string()) | |||
} else { | |||
match col.data_type() { | |||
DataType::Null => Ok(NULL_STR.to_string()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what the pyarrow CI failures are, but they don't appear to be related to this PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw one todo -- so far this PR looks epic. I look forward to completing my review in the morning
@@ -1329,6 +1321,11 @@ impl ScalarValue { | |||
self.to_array_of_size(1) | |||
} | |||
|
|||
/// Converts a scalar into an arrow [`Scalar`] | |||
pub fn to_scalar(&self) -> Scalar<ArrayRef> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
@@ -237,6 +237,7 @@ async fn sort_preserving_merge() { | |||
} | |||
|
|||
#[tokio::test] | |||
#[ignore] // TODO: Fix this |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this still seems to be left TODO
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, forgot about that 😅
@@ -581,7 +581,9 @@ fn make_dict_batches() -> Vec<RecordBatch> { | |||
// ... | |||
// 0000000002 | |||
|
|||
let values: Vec<_> = (i..i + batch_size).map(|x| format!("{x:010}")).collect(); | |||
let values: Vec<_> = (i..i + batch_size) | |||
.map(|x| format!("{:010}", x / 16)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change was necessary to make it so the dictionaries contain some repeated values, which avoids the RowConverter overheads coming to dominate the memory usage to the point where testing the spill reservation becomes a moot point, as the intermediate memory usage of the merge is too great
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @tustvold -- I went through this PR carefully and it looks (really) nice to me. Tightening up of the binary expression evaluation has been something that has bothered me for a long time. Thank you for doing this.
I also checked the size of the datafusion-cli binary (built via cargo build --release
):
This branch:
du -h /Users/alamb/Software/target-df/release/datafusion-cli
72M /Users/alamb/Software/target-df/release/datafusion-cli
Main:
$ du -h /Users/alamb/Software/target-df2/release/datafusion-cli
85M /Users/alamb/Software/target-df2/release/datafusion-cli
or_kleene(&and(&left_is_null, &right_is_null)?, &eq) | ||
} | ||
_ => eq_dyn(left, right), | ||
_ if null_equals_null => not_distinct(&left, &right), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI @Dandandan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice 👍
@@ -302,7 +302,7 @@ CREATE TABLE my_table(c1 float, c2 double, c3 boolean, c4 varchar) AS SELECT *,c | |||
query RRBT rowsort | |||
SELECT * FROM my_table order by c1 LIMIT 1 | |||
---- | |||
0.00001 0.000000000001 true 1 | |||
0.00001 0.000000000001 true true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
converting boolean to "true"
seems like an improvement from 1
to me
//! This module contains computation kernels that are eventually | ||
//! destined for arrow-rs but are in datafusion until they are ported. | ||
|
||
use arrow::{array::*, datatypes::ArrowNumericType}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you so much @tustvold -- this is great to see this gone
Ok(Arc::new(paste::expr! {[<$OP _binary>]}(&ll, &rr)?)) | ||
}}; | ||
} | ||
|
||
/// Invoke a compute kernel on a data array and a scalar value | ||
macro_rules! compute_utf8_op_scalar { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it seems like eventually we could make a ColumnarValue::to_arrow_datum
type function that would allow direct invocation of many of these kernels from DataFusion without needing layers of dispatch
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, ideally with #7353 we could make ScalarValue
implement Datum
directly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And then implement Datum
directly for ColumnarValue
as well 🤔 that would be pretty sweet
Which issue does this PR close?
Closes #7332
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?