-
Notifications
You must be signed in to change notification settings - Fork 49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Streaming iterator to fix the snapshot creation issue #515
base: main
Are you sure you want to change the base?
Conversation
Implements a streaming iterator to group Delta log files by version number. Key features: - Groups commit logs and checkpoints by version - Assumes input is pre-sorted by version - Supports streaming iteration for memory efficiency - Includes comprehensive test suite The implementation handles various Delta log file types including: - Regular commit logs (.json) - Single checkpoints (.parquet) - Multi-part checkpoints
Adds initial test infrastructure for the version grouping functionality: - Helper functions for test data setup - Basic test case for single commit file grouping - Documentation for test utilities
Tests the core functionality of VersionGroupingIterator with multiple sequential versions. Verifies that commit files are correctly grouped and ordered from version 1->2->3. explaining the Delta protocol requirements.
- Single/multi-part checkpoint handling - Mixed version and file type combinations - Empty iterator case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice start. Very thorough tests.
Is the plan to incorporate it into actual log listing as part of this PR? Or as follow-up work?
kernel/src/grouping_iterator.rs
Outdated
// 3. [5] | ||
pub struct VersionGroup<L: AsUrl = FileMeta> { | ||
version: u64, | ||
files: Vec<ParsedLogPath<L>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just verifying, the following assertion should always hold?
assert!(g.files.iter().all(|f| f.version == g.version))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't understand this fully. Could you elaborate @scovich?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, that was cryptic. VersionGroup
contains a version
field, and also references a bunch of ParsedLogPath
that each have their own version
field. All those versions should always be the same, correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct, the version field in each of the ParsedLogpath should match the version in the VersionGroup.
kernel/src/grouping_iterator.rs
Outdated
pub struct VersionGroupingIterator<L: AsUrl = FileMeta> { | ||
files: Peekable<Box<dyn Iterator<Item = ParsedLogPath<L>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any particular reason to favor Box<dyn>
over just making it fully templated?
pub struct VersionGroupingIterator<L: AsUrl = FileMeta> { | |
files: Peekable<Box<dyn Iterator<Item = ParsedLogPath<L>>>>, | |
pub struct VersionGroupingIterator<I, L = FileMeta> | |
where | |
L: AsUrl, | |
I: Iterator<Item = ParsedLogPath<L>>, | |
{ | |
files: Peekable<I>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As mentioned in the comment, here's my understanding,
I used Box<dyn>
deliberately to support arbitrary iterator types that yield ParsedLogPath
, rather than fixating upon a concrete compile time type to yield the ParsedLogPath
. .
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's normal for rust code to use templates for that -- the user doesn't normally see or care about the template type because it's obtained by calling helper functions; if the user ever decides they need a generic type they can always cast it as a &dyn Foo
or store it in a Box<dyn Foo>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can already see this leads to much cleaner and simpler code.
kernel/src/grouping_iterator.rs
Outdated
"commit" => format!("{:020}.json", version), | ||
"checkpoint" => format!("{:020}.checkpoint.parquet", version), | ||
"multipart1" => format!("{:020}.checkpoint.0000000001.0000000002.parquet", version), | ||
"multipart2" => format!("{:020}.checkpoint.0000000002.0000000002.parquet", version), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just pass in a LogPathFileType
directly, since each variant contains exactly the necessary info?
@zachschuermann -- it seems like we recently encountered another place that would have benefitted from a utility that creates log path file names like this? Does that ring any bells?
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #515 +/- ##
=======================================
Coverage ? 80.33%
=======================================
Files ? 62
Lines ? 13471
Branches ? 13471
=======================================
Hits ? 10822
Misses ? 2098
Partials ? 551 ☔ View full report in Codecov by Sentry. 🚨 Try these New Features:
|
@scovich In this PR, ill just focus on the building blocks for grouping iterator and will raise another one which will use the building blocks. |
Changes: - Remove redundant type annotation for version variable - Fix duplicate #[test] attribute in test_single_commit
Changes: - Replace Box<dyn Iterator> with generic type parameter I in VersionGroupingIterator - Add type parameters I and L with appropriate trait bounds using where clauses - Remove 'static bounds as they're no longer needed without Box<dyn> - Simplify From implementation to use concrete iterator type directly This change: 1. Improves performance by avoiding dynamic dispatch 2. Makes the code more idiomatic by using generics 3. Maintains flexibility while being more efficient 4. Removes unnecessary boxing of iterators
Changes: 1. Remove redundant type annotations for VersionGroupingIterator 2. Reorder imports alphabetically 3. Fix formatting and whitespace consistency 4. Add test for checkpoint detection across versions
// I: The concrete iterator type that yields ParsedLogPath<L> | ||
// L: The type implementing AsUrl that represents the underlying file location | ||
// This allows for flexible iteration over log files while maintaining type safety | ||
pub struct VersionGroupingIterator<I, L> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depending on usage, it may make sense to do <L, I>
if L
ever needs to be specified directly. We can always change it later as the need arises tho.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Interesting... rust seems able to infer generic parameters regardless of the order they're declared in? The unit tests already do e.g.
let mut iter: VersionGroupingIterator<Url> = VersionGroupingIterator::from(paths.into_iter());
let files = value; | ||
VersionGroupingIterator { files: files.peekable() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let files = value; | |
VersionGroupingIterator { files: files.peekable() } | |
VersionGroupingIterator { files: value.peekable() } |
or
let files = value; | |
VersionGroupingIterator { files: files.peekable() } | |
let files = value.peekable(); | |
VersionGroupingIterator { files } |
// if it does, we add it to the current group | ||
// if it doesn't, we return the current group and start a new one | ||
// this is why we need to assume that the input iterator is already sorted by version, because we only check the next file | ||
while let Some(parsed_logpath) = self.files.next_if(|v| v.version == version) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
while let Some(parsed_logpath) = self.files.next_if(|v| v.version == version) { | |
while let Some(parsed_logpath) = self.files.next_if(|f| f.version == version) { |
type Item = VersionGroup<L>; | ||
|
||
fn next(&mut self) -> Option<Self::Item> { | ||
while let Some(logpath) = self.files.next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't actually a loop.. it executes at most once. Which means we can simplify (un-indent) the code by:
let logpath = self.files.next()?;
let version = logpath.version;
...
Some(VersionGroup { version, files })
impl<I, L> From<I> for VersionGroupingIterator<I,L> | ||
where | ||
L: AsUrl, | ||
I: Iterator<Item = ParsedLogPath<L>>, | ||
{ | ||
fn from(value: I) -> Self { | ||
let files = value; | ||
VersionGroupingIterator { files: files.peekable() } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Based on a playground experiment, it looks like we make make this even easier to use by:
impl<I, L> From<I> for VersionGroupingIterator<I,L> | |
where | |
L: AsUrl, | |
I: Iterator<Item = ParsedLogPath<L>>, | |
{ | |
fn from(value: I) -> Self { | |
let files = value; | |
VersionGroupingIterator { files: files.peekable() } | |
impl<I, L> From<I> for VersionGroupingIterator<I::IntoIter, L> | |
where | |
L: AsUrl, | |
I: IntoIterator<Item = ParsedLogPath<L>>, | |
{ | |
fn from(value: I) -> Self { | |
VersionGroupingIterator { files: value.into_iter().peekable() } |
... and then the unit tests can just do e.g.
let mut iter: VersionGroupingIterator<Url> = VersionGroupingIterator::from(paths);
let paths = vec![create_log_path(1, "commit")]; | ||
let mut iter: VersionGroupingIterator<Url> = VersionGroupingIterator::from(paths.into_iter()); | ||
|
||
if let Some(group) = iter.next() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just
if let Some(group) = iter.next() { | |
let group = iter.next().unwrap(); |
or at least
if let Some(group) = iter.next() { | |
let group = iter.next().expect("Expected a group"); |
(several more below)
#[test] | ||
fn test_single_commit() { | ||
let paths = vec![create_log_path(1, "commit")]; | ||
let mut iter: VersionGroupingIterator<Url> = VersionGroupingIterator::from(paths.into_iter()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
out of curiosity -- is the type annotation actually needed?
Even if it is needed, we should be able to use the derived Into
:
let mut iter: VersionGroupingIterator<Url> = VersionGroupingIterator::from(paths.into_iter()); | |
let mut iter: VersionGroupingIterator<Url> = paths.into_iter().into(); |
// check if the given version of the snapshot contains checkpoint file | ||
impl<L: AsUrl> VersionGroup<L> { | ||
pub fn contains_checkpoint(&self) -> bool { | ||
self.files.iter().any(|f| f.is_checkpoint()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
aside: It's a little brittle if the set of recognized/kept files changes, but the checkpoint is always the first out of the known file types. So we can do:
self.files.iter().any(|f| f.is_checkpoint()) | |
self.files.first().is_some_and(|f| f.is_checkpoint()) |
(but the iterator will anyway terminate on the first entry so this is probably over-optimizing)
let files = value; | ||
VersionGroupingIterator { | ||
files: files.peekable(), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cargo fmt
will force each value to its own line unless they're simple identifiers, so it's worth pulling everything out to the helper variable:
let files = value; | |
VersionGroupingIterator { | |
files: files.peekable(), | |
} | |
let files = value.peekable(); | |
VersionGroupingIterator { files } |
Fix for #323.
Opening up for early preview. Closing the old PR #322