Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TableChangesScan::execute and end to end testing for CDF #580

Merged
merged 75 commits into from
Dec 11, 2024

Conversation

OussamaSaoudi-db
Copy link
Collaborator

@OussamaSaoudi-db OussamaSaoudi-db commented Dec 9, 2024

What changes are proposed in this pull request?

This PR introduces the execute method to TableChangesScan, which performs a CDF scan and returns ScanResult the change data feed. The ScanResult holds engine data and a selection vector to apply to the rows of the data.

A helper method read_scan_file is added to read the rows of a scan_file and perform physical to logical transformation.

The macros sort_lines and asert_batches_sorted_eq are moved to the common crate so they can be shared between the read.rs and new cdf.rs integration tests.

The cdf-table and cdf-table-non-partitioned test tables are from the delta-rs project.

How was this change tested?

This tests data reading for 3 tables with the following features:

  • a table with deletion vectors involving deletion and restoration
  • a non-partitioned table
  • a partitioned table

@OussamaSaoudi-db
Copy link
Collaborator Author

TODO; rename table-with-cdf-and-dv to cdf-table-with-dv

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing comments, i'll let you rebase now that the other PR landed

Comment on lines +268 to +269
last_modified: 0,
size: 0,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, perhaps we should store these in CdfScanFile? make a follow-up if yes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this before and the conclusion was that these may someday be useful, but immediately they don't contribute much.

iirc, last_modified may be handy because it could be used to check cache staleness in the engine in case it saves a file in memory without re-reading it for each scan.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right: so I think if the decision is to keep them in filemeta then we should have a follow up to actually use them here (and enable any further optimizations downstream - without this, no matter the downstream optimizations they won't be used)


let physical_to_logical_expr =
physical_to_logical_expr(&scan_file, global_state.logical_schema.as_ref(), all_fields)?;
let read_schema = scan_file_read_schema(&scan_file, global_state.read_schema.as_ref());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this physical_schema? maybe stick to physical/logical naming?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya sure 👍 In that case, we should probably do some renaming in global scan state and scan/mod.rs at some point.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address this further in this PR too: https://github.com/delta-io/delta-kernel-rs/pull/588/files

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cool thanks!

Comment on lines 272 to 275
let read_result_iter =
engine
.get_parquet_handler()
.read_parquet_files(&[file], read_schema, predicate)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(don't have to solve now) I wonder if we should introduce this higher in the iterator pipeline we've built up here?

Oversimplification, but instead of something like:

scan_files
    .map(|scan_file| read_parquet_files([scan_file])) // parquet batches
    .map(|batch| do_stuff) // return final batches

could push it up to be

read_parquet_files(scan_files)  // parquet batches
    .map(|batch| do_stuff) // return final batches

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsure how much state you'd need to track and associate with each batch (if it makes things too complicated)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm we'd have to somehow associate the scan_file and selection_vector to the batch because it's important for the P2L transformation. So the type of the proposed read_parquet_files would have to be something like Iterator<(EngineData, ResolvedCdfScanFile)>.
and do_stuff: (EngineData, ResolvedCdfScanFile) -> ScanResult

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh i posted another comment about this, but maybe just make an issue to invest a bit more time into this

@@ -160,7 +160,7 @@ impl ScanResult {
/// store the name of the column, as that's all that's needed during the actual query. For
/// `Partition` we store an index into the logical schema for this query since later we need the
/// data type as well to materialize the partition column.
#[derive(PartialEq, Debug)]
#[derive(Clone, PartialEq, Debug)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clone needed because now we process the selected column types separately for each commit, where the normal log replay scan does it all in one shot?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good point. The reason is really that I don't want to hold a reference to all_fields in TableChangesScan because the iterator needs to be free from lifetimes.

But really all I need is to Arc it. Now we don't need to clone the Vec.

Note that the existing Scan::execute still borrows self

    pub fn execute(
        &self,
        engine: Arc<dyn Engine>,
    ) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>> + '_> {

as you can see with that anonymous lifetime '_. I was planning on opening a followup PR to take that out too.

@@ -29,8 +33,6 @@ pub struct TableChangesScan {
predicate: Option<ExpressionRef>,
// The [`ColumnType`] of all the fields in the `logical_schema`
all_fields: Vec<ColumnType>,
// `true` if any column in the `logical_schema` is a partition column
have_partition_cols: bool,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is special about CDF that we don't need to care about partition columns?
Or did you find a better approach that we should consider backporting to the normal scan as well?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have_partition_cols was useful in scan to skip building the physical to logical expression. Source. For CDF, we always have to construct an expression because it changes based on the scan file.

let table_root = self.table_changes.table_root().clone();
let all_fields = self.all_fields.clone();
let predicate = self.predicate.clone();
let dv_engine_ref = engine.clone();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting... we need a separate arc for each of the two map calls, because they co-exist in time and they both outlive this function call.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I was surprised to see that too. I suppose an arc clone is cheap enough given all the processing we're doing.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I don't think there's anything wrong -- it was just the first time I'd seen that and was trying to grok it

kernel/src/table_changes/scan.rs Show resolved Hide resolved
Comment on lines 38 to 42
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'm always torn which syntax to use when each case would be a single line either way:

Suggested change
if let Some(mask) = mask {
Ok(filter_record_batch(&record_batch, &mask.into())?)
} else {
Ok(record_batch)
}
match mask {
Some(mask) => Ok(filter_record_batch(&record_batch, &mask.into())?),
None => Ok(record_batch),
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd lean towards the match since it feels clearer that there are only two cases.

Copy link

codecov bot commented Dec 10, 2024

Codecov Report

Attention: Patch coverage is 89.58333% with 10 lines in your changes missing coverage. Please review.

Project coverage is 83.44%. Comparing base (af075a8) to head (192849d).
Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
kernel/src/table_changes/scan.rs 88.63% 1 Missing and 9 partials ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #580      +/-   ##
==========================================
+ Coverage   83.21%   83.44%   +0.23%     
==========================================
  Files          74       74              
  Lines       16775    16861      +86     
  Branches    16775    16861      +86     
==========================================
+ Hits        13959    14070     +111     
+ Misses       2168     2130      -38     
- Partials      648      661      +13     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

Copy link
Collaborator

@zachschuermann zachschuermann left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM woooo!!! but don't think tests are exhaustive (probably need to test some of the filtering/projection/etc. and i'll need to think through more edge cases) but let's merge and take on some more testing soon!

location,
};
let read_result_iter = engine.get_parquet_handler().read_parquet_files(
&[file],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding that other conversation maybe we just make a follow up to consider pulling read_parquet_files higher in the iterator stack to do better than single-file calls?

/// unpack the test data from {test_parent_dir}/{test_name}.tar.zst into a temp dir, and return the dir it was
/// unpacked into
#[allow(unused)]
pub(crate) fn load_test_data(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice thanks!

Ok(batches)
}

fn assert_batches_sorted_eq(expected_lines: &[impl ToString], batches: &[RecordBatch]) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this duplicated? can we share code or just create follow up to clean this up soon?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch. Moved it to common and updated tests. We still passing 😎

@OussamaSaoudi-db OussamaSaoudi-db merged commit 7bcbb57 into delta-io:main Dec 11, 2024
20 checks passed
@zachschuermann zachschuermann removed the breaking-change Change that will require a version bump label Dec 14, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants