-
Notifications
You must be signed in to change notification settings - Fork 53
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
TableChangesScan::execute and end to end testing for CDF #580
Changes from 11 commits
bd9585d
e2c2d90
3db127f
ec6e629
74eed10
9fe16db
8635e38
a17f4b0
5fab00c
21553a4
08867f8
1228107
97a5790
4969e44
dcb17fa
ebaf225
bd49142
6287e6e
7fe4f4d
d4f95d5
8a8f6bf
eeaabb0
fa13ade
4dabdaf
7fcb531
577b424
a970df1
5b70aab
3e0ead6
0b207c4
68a4fb1
3259769
2e9c29e
70fe573
876dd15
35b38d4
dfdc491
020a19d
8a92379
3d2e53a
8df172f
7846757
67a9a18
fa042c1
4098b67
610d62e
4bc5819
02599e6
bd43bba
61da4c3
65bce5f
bea39ba
5622874
221b96f
857d644
496a69a
e3031c3
88df7fa
2eeb144
376c061
c03b58f
59c90f8
6ff5b63
23ce3cf
09b709b
17eaf3e
44c8ec9
e3f183c
2ecf506
29b7548
50b5d45
c6c5bef
1e4cc61
75445af
192849d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,18 +2,22 @@ use std::sync::Arc; | |
|
||
use itertools::Itertools; | ||
use tracing::debug; | ||
use url::Url; | ||
|
||
use crate::actions::deletion_vector::split_vector; | ||
use crate::scan::state::GlobalScanState; | ||
use crate::scan::ColumnType; | ||
use crate::scan::{ColumnType, ScanResult}; | ||
use crate::schema::{SchemaRef, StructType}; | ||
use crate::{DeltaResult, Engine, ExpressionRef}; | ||
use crate::{DeltaResult, Engine, ExpressionRef, FileMeta}; | ||
|
||
use super::log_replay::{table_changes_action_iter, TableChangesScanData}; | ||
use super::physical_to_logical::{physical_to_logical_expr, scan_file_read_schema}; | ||
use super::resolve_dvs::{resolve_scan_file_dv, ResolvedCdfScanFile}; | ||
use super::scan_file::scan_data_to_scan_file; | ||
use super::{TableChanges, CDF_FIELDS}; | ||
|
||
/// The result of building a [`TableChanges`] scan over a table. This can be used to get a change | ||
/// data feed from the table | ||
#[allow(unused)] | ||
#[derive(Debug)] | ||
pub struct TableChangesScan { | ||
// The [`TableChanges`] that specifies this scan's start and end versions | ||
|
@@ -29,8 +33,6 @@ pub struct TableChangesScan { | |
predicate: Option<ExpressionRef>, | ||
// The [`ColumnType`] of all the fields in the `logical_schema` | ||
all_fields: Vec<ColumnType>, | ||
// `true` if any column in the `logical_schema` is a partition column | ||
have_partition_cols: bool, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is special about CDF that we don't need to care about partition columns? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
/// This builder constructs a [`TableChangesScan`] that can be used to read the [`TableChanges`] | ||
|
@@ -115,7 +117,6 @@ impl TableChangesScanBuilder { | |
let logical_schema = self | ||
.schema | ||
.unwrap_or_else(|| self.table_changes.schema.clone().into()); | ||
let mut have_partition_cols = false; | ||
let mut read_fields = Vec::with_capacity(logical_schema.fields.len()); | ||
|
||
// Loop over all selected fields. We produce the following: | ||
|
@@ -138,7 +139,6 @@ impl TableChangesScanBuilder { | |
// Store the index into the schema for this field. When we turn it into an | ||
// expression in the inner loop, we will index into the schema and get the name and | ||
// data type, which we need to properly materialize the column. | ||
have_partition_cols = true; | ||
Ok(ColumnType::Partition(index)) | ||
} else if CDF_FIELDS | ||
.iter() | ||
|
@@ -164,7 +164,6 @@ impl TableChangesScanBuilder { | |
logical_schema, | ||
predicate: self.predicate, | ||
all_fields, | ||
have_partition_cols, | ||
physical_schema: StructType::new(read_fields).into(), | ||
}) | ||
} | ||
|
@@ -176,7 +175,6 @@ impl TableChangesScan { | |
/// necessary to read CDF. Additionally, [`TableChangesScanData`] holds metadata on the | ||
/// deletion vectors present in the commit. The engine data in each scan data is guaranteed | ||
/// to belong to the same commit. Several [`TableChangesScanData`] may belong to the same commit. | ||
#[allow(unused)] | ||
fn scan_data( | ||
&self, | ||
engine: Arc<dyn Engine>, | ||
|
@@ -192,7 +190,6 @@ impl TableChangesScan { | |
|
||
/// Get global state that is valid for the entire scan. This is somewhat expensive so should | ||
/// only be called once per scan. | ||
#[allow(unused)] | ||
fn global_scan_state(&self) -> GlobalScanState { | ||
let end_snapshot = &self.table_changes.end_snapshot; | ||
GlobalScanState { | ||
|
@@ -203,6 +200,99 @@ impl TableChangesScan { | |
column_mapping_mode: end_snapshot.column_mapping_mode, | ||
} | ||
} | ||
|
||
/// Perform an "all in one" scan to get the change data feed. This will use the provided `engine` | ||
/// to read and process all the data for the query. Each [`ScanResult`] in the resultant iterator | ||
/// encapsulates the raw data and an optional boolean vector built from the deletion vector if it | ||
/// was present. See the documentation for [`ScanResult`] for more details. | ||
pub fn execute( | ||
&self, | ||
engine: Arc<dyn Engine>, | ||
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> { | ||
let scan_data = self.scan_data(engine.clone())?; | ||
let scan_files = scan_data_to_scan_file(scan_data); | ||
|
||
let global_scan_state = self.global_scan_state(); | ||
let table_root = self.table_changes.table_root().clone(); | ||
let all_fields = self.all_fields.clone(); | ||
let predicate = self.predicate.clone(); | ||
let dv_engine_ref = engine.clone(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interesting... we need a separate arc for each of the two map calls, because they co-exist in time and they both outlive this function call. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah I was surprised to see that too. I suppose an arc clone is cheap enough given all the processing we're doing. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I don't think there's anything wrong -- it was just the first time I'd seen that and was trying to grok it |
||
|
||
let result = scan_files | ||
.map(move |scan_file| { | ||
resolve_scan_file_dv(dv_engine_ref.as_ref(), &table_root, scan_file?) | ||
}) // Iterator-Result-Iterator | ||
.flatten_ok() // Iterator-Result | ||
.map(move |resolved_scan_file| -> DeltaResult<_> { | ||
read_scan_file( | ||
engine.as_ref(), | ||
resolved_scan_file?, | ||
&global_scan_state, | ||
&all_fields, | ||
predicate.clone(), | ||
) | ||
}) // Iterator-Result-Iterator-Result | ||
.flatten_ok() // Iterator-Result-Result | ||
.map(|x| x?); // Iterator-Result | ||
|
||
Ok(result) | ||
} | ||
} | ||
|
||
/// Reads the data at the `resolved_scan_file` and transforms the data from physical to logical. | ||
/// The result is a fallible iterator of [`ScanResult`] containing the logical data. | ||
fn read_scan_file( | ||
engine: &dyn Engine, | ||
resolved_scan_file: ResolvedCdfScanFile, | ||
global_state: &GlobalScanState, | ||
all_fields: &[ColumnType], | ||
predicate: Option<ExpressionRef>, | ||
) -> DeltaResult<impl Iterator<Item = DeltaResult<ScanResult>>> { | ||
let ResolvedCdfScanFile { | ||
scan_file, | ||
mut selection_vector, | ||
} = resolved_scan_file; | ||
|
||
let physical_to_logical_expr = | ||
physical_to_logical_expr(&scan_file, global_state.logical_schema.as_ref(), all_fields)?; | ||
let read_schema = scan_file_read_schema(&scan_file, global_state.read_schema.as_ref()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this physical_schema? maybe stick to physical/logical naming? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ya sure 👍 In that case, we should probably do some renaming in global scan state and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Address this further in this PR too: https://github.com/delta-io/delta-kernel-rs/pull/588/files There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. cool thanks! |
||
let phys_to_logical_eval = engine.get_expression_handler().get_evaluator( | ||
read_schema.clone(), | ||
physical_to_logical_expr, | ||
global_state.logical_schema.clone().into(), | ||
); | ||
|
||
let table_root = Url::parse(&global_state.table_root)?; | ||
let location = table_root.join(&scan_file.path)?; | ||
let file = FileMeta { | ||
last_modified: 0, | ||
size: 0, | ||
Comment on lines
+289
to
+290
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hm, perhaps we should store these in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We discussed this before and the conclusion was that these may someday be useful, but immediately they don't contribute much. iirc, last_modified may be handy because it could be used to check cache staleness in the engine in case it saves a file in memory without re-reading it for each scan. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right: so I think if the decision is to keep them in filemeta then we should have a follow up to actually use them here (and enable any further optimizations downstream - without this, no matter the downstream optimizations they won't be used) |
||
location, | ||
}; | ||
let read_result_iter = | ||
engine | ||
.get_parquet_handler() | ||
.read_parquet_files(&[file], read_schema, predicate)?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (don't have to solve now) I wonder if we should introduce this higher in the iterator pipeline we've built up here? Oversimplification, but instead of something like: scan_files
.map(|scan_file| read_parquet_files([scan_file])) // parquet batches
.map(|batch| do_stuff) // return final batches could push it up to be read_parquet_files(scan_files) // parquet batches
.map(|batch| do_stuff) // return final batches There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unsure how much state you'd need to track and associate with each batch (if it makes things too complicated) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm we'd have to somehow associate the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh i posted another comment about this, but maybe just make an issue to invest a bit more time into this |
||
|
||
let result = read_result_iter.map(move |batch| -> DeltaResult<_> { | ||
let batch = batch?; | ||
// to transform the physical data into the correct logical form | ||
let logical = phys_to_logical_eval.evaluate(batch.as_ref()); | ||
let len = logical.as_ref().map_or(0, |res| res.len()); | ||
// need to split the dv_mask. what's left in dv_mask covers this result, and rest | ||
// will cover the following results. we `take()` out of `selection_vector` to avoid | ||
// trying to return a captured variable. We're going to reassign `selection_vector` | ||
// to `rest` in a moment anyway | ||
let mut sv = selection_vector.take(); | ||
let rest = split_vector(sv.as_mut(), len, None); | ||
let result = ScanResult { | ||
raw_data: logical, | ||
raw_mask: sv, | ||
}; | ||
selection_vector = rest; | ||
zachschuermann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Ok(result) | ||
}); | ||
Ok(result) | ||
} | ||
|
||
#[cfg(test)] | ||
|
@@ -238,7 +328,6 @@ mod tests { | |
] | ||
); | ||
assert_eq!(scan.predicate, None); | ||
assert!(!scan.have_partition_cols); | ||
} | ||
|
||
#[test] | ||
|
@@ -276,7 +365,6 @@ mod tests { | |
]) | ||
.into() | ||
); | ||
assert!(!scan.have_partition_cols); | ||
assert_eq!(scan.predicate, Some(predicate)); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clone needed because now we process the selected column types separately for each commit, where the normal log replay scan does it all in one shot?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah good point. The reason is really that I don't want to hold a reference to
all_fields
inTableChangesScan
because the iterator needs to be free from lifetimes.But really all I need is to
Arc
it. Now we don't need to clone the Vec.Note that the existing
Scan::execute
still borrows selfas you can see with that anonymous lifetime
'_
. I was planning on opening a followup PR to take that out too.