-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Determine ordering of file groups #9593
Merged
Merged
Changes from 45 commits
Commits
Show all changes
52 commits
Select commit
Hold shift + click to select a range
7587a07
add statistics to PartitionedFile
suremarc 1e380b2
just dump work for now
suremarc 263453f
working test case
suremarc 5634bd7
fix jumbled rebase
suremarc 7428fe0
forgot to annotate #[test]
suremarc 4816343
more refactoring
suremarc c7be9e0
add a link
suremarc fc1a668
refactor again
suremarc 1c42e00
whitespace
suremarc 3446fed
format debug log
suremarc 3fe8558
remove useless itertools
suremarc 8ba4001
refactor test
suremarc 9c8729a
fix bug
suremarc 6df9832
use sort_file_groups in ListingTable
suremarc f855a8a
move check into a better place
suremarc 3e5263b
refactor test a bit
suremarc 5b7b307
more testing
suremarc 4761096
more testing
suremarc a95dffa
better error message
suremarc 1a66604
fix log msg
suremarc cca5f0f
fix again
suremarc e6e10e8
Merge remote-tracking branch 'origin/main' into statistics-planning
suremarc 8f7a2d7
add sqllogictest and fixes
suremarc e9fad54
fix test
suremarc e982f0f
Update datafusion/core/src/datasource/listing/mod.rs
suremarc cc9f144
Update datafusion/core/src/datasource/physical_plan/file_scan_config.rs
suremarc 95bb790
more unit tests
suremarc 0e60230
rename to split_groups_by_statistics
suremarc 9f375e8
only use groups if there's <= target_partitions
suremarc 3d9d293
refactor a bit, no need for projected_schema
suremarc 1366c99
fix reverse order
suremarc a29be69
Merge remote-tracking branch 'origin/main' into statistics-planning
suremarc 2ef8006
save work for now
suremarc b112c26
Merge branch 'main' into statistics-planning
suremarc 0153acf
lots of test cases in new slt
suremarc 4e03528
remove output check
suremarc 695e674
fix
suremarc ec4282b
fix last test
suremarc 1030b30
comment on params
suremarc 2f34684
clippy
suremarc 24c0bc5
revert parquet.slt
suremarc 61f883f
no need to pass projection separately
suremarc 9bc29cf
Update datafusion/core/src/datasource/listing/mod.rs
suremarc aa89433
update comment on in
suremarc d7fc78a
fix test?
suremarc f3a69e5
un-fix?
suremarc 1a010b7
add fix back in?
suremarc f41d1c9
move indices_sorted_by_min to MinMaxStatistics
suremarc 15e1339
move MinMaxStatistics to its own module
suremarc a2c9b4e
fix license
suremarc d7c9af6
add feature flag
suremarc 82166fd
update config
suremarc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -739,16 +739,35 @@ impl TableProvider for ListingTable { | |
filters: &[Expr], | ||
limit: Option<usize>, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
let (partitioned_file_lists, statistics) = | ||
let (mut partitioned_file_lists, statistics) = | ||
self.list_files_for_scan(state, filters, limit).await?; | ||
|
||
let projected_schema = project_schema(&self.schema(), projection)?; | ||
|
||
// if no files need to be read, return an `EmptyExec` | ||
if partitioned_file_lists.is_empty() { | ||
let schema = self.schema(); | ||
let projected_schema = project_schema(&schema, projection)?; | ||
return Ok(Arc::new(EmptyExec::new(projected_schema))); | ||
} | ||
|
||
let output_ordering = self.try_create_output_ordering()?; | ||
match output_ordering.first().map(|output_ordering| { | ||
FileScanConfig::split_groups_by_statistics( | ||
&self.table_schema, | ||
&partitioned_file_lists, | ||
output_ordering, | ||
) | ||
}) { | ||
Some(Err(e)) => log::debug!("failed to split file groups by statistics: {e}"), | ||
Some(Ok(new_groups)) => { | ||
if new_groups.len() <= self.options.target_partitions { | ||
partitioned_file_lists = new_groups; | ||
} else { | ||
log::debug!("attempted to split file groups by statistics, but there were more file groups than target_partitions; falling back to unordered") | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
} | ||
None => {} // no ordering required | ||
}; | ||
|
||
// extract types of partition columns | ||
let table_partition_cols = self | ||
.options | ||
|
@@ -772,6 +791,7 @@ impl TableProvider for ListingTable { | |
} else { | ||
return Ok(Arc::new(EmptyExec::new(Arc::new(Schema::empty())))); | ||
}; | ||
|
||
// create the execution plan | ||
self.options | ||
.format | ||
|
@@ -784,7 +804,7 @@ impl TableProvider for ListingTable { | |
statistics, | ||
projection: projection.cloned(), | ||
limit, | ||
output_ordering: self.try_create_output_ordering()?, | ||
output_ordering, | ||
table_partition_cols, | ||
}, | ||
filters.as_ref(), | ||
|
@@ -937,10 +957,11 @@ impl ListingTable { | |
// collect the statistics if required by the config | ||
let files = file_list | ||
.map(|part_file| async { | ||
let part_file = part_file?; | ||
let mut part_file = part_file?; | ||
if self.options.collect_stat { | ||
let statistics = | ||
self.do_collect_statistics(ctx, &store, &part_file).await?; | ||
part_file.statistics = Some(statistics.clone()); | ||
Ok((part_file, statistics)) as Result<(PartitionedFile, Statistics)> | ||
} else { | ||
Ok((part_file, Statistics::new_unknown(&self.file_schema))) | ||
|
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is actually a nice API to potentially provide pre-known statistics 👍